Merge pull request #27486 from taosdata/enh/TD-31691
Modify error code passing for taosMemoryCalloc function
This commit is contained in:
commit
91094d2dd0
|
@ -477,7 +477,7 @@ int32_t createTscObj(const char *user, const char *auth, const char *db, int32_t
|
||||||
STscObj **pObj) {
|
STscObj **pObj) {
|
||||||
*pObj = (STscObj *)taosMemoryCalloc(1, sizeof(STscObj));
|
*pObj = (STscObj *)taosMemoryCalloc(1, sizeof(STscObj));
|
||||||
if (NULL == *pObj) {
|
if (NULL == *pObj) {
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
(*pObj)->pRequests = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
|
(*pObj)->pRequests = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
|
||||||
|
@ -556,7 +556,7 @@ int32_t createRequest(uint64_t connId, int32_t type, int64_t reqid, SRequestObj
|
||||||
(*pRequest)->inCallback = false;
|
(*pRequest)->inCallback = false;
|
||||||
(*pRequest)->msgBuf = taosMemoryCalloc(1, ERROR_MSG_BUF_DEFAULT_SIZE);
|
(*pRequest)->msgBuf = taosMemoryCalloc(1, ERROR_MSG_BUF_DEFAULT_SIZE);
|
||||||
if (NULL == (*pRequest)->msgBuf) {
|
if (NULL == (*pRequest)->msgBuf) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = terrno;
|
||||||
goto _return;
|
goto _return;
|
||||||
}
|
}
|
||||||
(*pRequest)->msgBufLen = ERROR_MSG_BUF_DEFAULT_SIZE;
|
(*pRequest)->msgBufLen = ERROR_MSG_BUF_DEFAULT_SIZE;
|
||||||
|
|
|
@ -169,8 +169,7 @@ static int32_t hbGenerateVgInfoFromRsp(SDBVgInfo **pInfo, SUseDbRsp *rsp) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
SDBVgInfo *vgInfo = taosMemoryCalloc(1, sizeof(SDBVgInfo));
|
SDBVgInfo *vgInfo = taosMemoryCalloc(1, sizeof(SDBVgInfo));
|
||||||
if (NULL == vgInfo) {
|
if (NULL == vgInfo) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
return terrno;
|
||||||
return code;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
vgInfo->vgVersion = rsp->vgVersion;
|
vgInfo->vgVersion = rsp->vgVersion;
|
||||||
|
@ -713,7 +712,7 @@ int32_t hbGetQueryBasicInfo(SClientHbKey *connKey, SClientHbReq *req) {
|
||||||
if (NULL == hbBasic) {
|
if (NULL == hbBasic) {
|
||||||
tscError("calloc %d failed", (int32_t)sizeof(SQueryHbReqBasic));
|
tscError("calloc %d failed", (int32_t)sizeof(SQueryHbReqBasic));
|
||||||
releaseTscObj(connKey->tscRid);
|
releaseTscObj(connKey->tscRid);
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
hbBasic->connId = pTscObj->connId;
|
hbBasic->connId = pTscObj->connId;
|
||||||
|
@ -1174,7 +1173,7 @@ static FORCE_INLINE void hbMgrInitHandle() {
|
||||||
int32_t hbGatherAllInfo(SAppHbMgr *pAppHbMgr, SClientHbBatchReq **pBatchReq) {
|
int32_t hbGatherAllInfo(SAppHbMgr *pAppHbMgr, SClientHbBatchReq **pBatchReq) {
|
||||||
*pBatchReq = taosMemoryCalloc(1, sizeof(SClientHbBatchReq));
|
*pBatchReq = taosMemoryCalloc(1, sizeof(SClientHbBatchReq));
|
||||||
if (pBatchReq == NULL) {
|
if (pBatchReq == NULL) {
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return terrno;
|
||||||
}
|
}
|
||||||
int32_t connKeyCnt = atomic_load_32(&pAppHbMgr->connKeyCnt);
|
int32_t connKeyCnt = atomic_load_32(&pAppHbMgr->connKeyCnt);
|
||||||
(*pBatchReq)->reqs = taosArrayInit(connKeyCnt, sizeof(SClientHbReq));
|
(*pBatchReq)->reqs = taosArrayInit(connKeyCnt, sizeof(SClientHbReq));
|
||||||
|
|
|
@ -135,7 +135,7 @@ int32_t taos_connect_internal(const char* ip, const char* user, const char* pass
|
||||||
if (pInst == NULL) {
|
if (pInst == NULL) {
|
||||||
p = taosMemoryCalloc(1, sizeof(struct SAppInstInfo));
|
p = taosMemoryCalloc(1, sizeof(struct SAppInstInfo));
|
||||||
if (NULL == p) {
|
if (NULL == p) {
|
||||||
TSC_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
|
TSC_ERR_JRET(terrno);
|
||||||
}
|
}
|
||||||
p->mgmtEp = epSet;
|
p->mgmtEp = epSet;
|
||||||
code = taosThreadMutexInit(&p->qnodeMutex, NULL);
|
code = taosThreadMutexInit(&p->qnodeMutex, NULL);
|
||||||
|
@ -513,9 +513,11 @@ int32_t setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32
|
||||||
taosMemoryFree(pResInfo->userFields);
|
taosMemoryFree(pResInfo->userFields);
|
||||||
}
|
}
|
||||||
pResInfo->fields = taosMemoryCalloc(numOfCols, sizeof(TAOS_FIELD));
|
pResInfo->fields = taosMemoryCalloc(numOfCols, sizeof(TAOS_FIELD));
|
||||||
|
if(NULL == pResInfo->fields) return terrno;
|
||||||
pResInfo->userFields = taosMemoryCalloc(numOfCols, sizeof(TAOS_FIELD));
|
pResInfo->userFields = taosMemoryCalloc(numOfCols, sizeof(TAOS_FIELD));
|
||||||
if (NULL == pResInfo->fields || NULL == pResInfo->userFields) {
|
if (NULL == pResInfo->userFields) {
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
taosMemoryFree(pResInfo->fields);
|
||||||
|
return terrno;
|
||||||
}
|
}
|
||||||
if (numOfCols != pResInfo->numOfCols) {
|
if (numOfCols != pResInfo->numOfCols) {
|
||||||
tscError("numOfCols:%d != pResInfo->numOfCols:%d", numOfCols, pResInfo->numOfCols);
|
tscError("numOfCols:%d != pResInfo->numOfCols:%d", numOfCols, pResInfo->numOfCols);
|
||||||
|
@ -1594,7 +1596,7 @@ int32_t taosConnectImpl(const char* user, const char* auth, const char* db, __ta
|
||||||
static int32_t buildConnectMsg(SRequestObj* pRequest, SMsgSendInfo** pMsgSendInfo) {
|
static int32_t buildConnectMsg(SRequestObj* pRequest, SMsgSendInfo** pMsgSendInfo) {
|
||||||
*pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
|
*pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
|
||||||
if (*pMsgSendInfo == NULL) {
|
if (*pMsgSendInfo == NULL) {
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
(*pMsgSendInfo)->msgType = TDMT_MND_CONNECT;
|
(*pMsgSendInfo)->msgType = TDMT_MND_CONNECT;
|
||||||
|
@ -1605,7 +1607,7 @@ static int32_t buildConnectMsg(SRequestObj* pRequest, SMsgSendInfo** pMsgSendInf
|
||||||
(*pMsgSendInfo)->param = taosMemoryCalloc(1, sizeof(pRequest->self));
|
(*pMsgSendInfo)->param = taosMemoryCalloc(1, sizeof(pRequest->self));
|
||||||
if (NULL == (*pMsgSendInfo)->param) {
|
if (NULL == (*pMsgSendInfo)->param) {
|
||||||
taosMemoryFree(*pMsgSendInfo);
|
taosMemoryFree(*pMsgSendInfo);
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
*(int64_t*)(*pMsgSendInfo)->param = pRequest->self;
|
*(int64_t*)(*pMsgSendInfo)->param = pRequest->self;
|
||||||
|
@ -1981,7 +1983,11 @@ static int32_t doPrepareResPtr(SReqResultInfo* pResInfo) {
|
||||||
pResInfo->convertBuf = taosMemoryCalloc(pResInfo->numOfCols, POINTER_BYTES);
|
pResInfo->convertBuf = taosMemoryCalloc(pResInfo->numOfCols, POINTER_BYTES);
|
||||||
|
|
||||||
if (pResInfo->row == NULL || pResInfo->pCol == NULL || pResInfo->length == NULL || pResInfo->convertBuf == NULL) {
|
if (pResInfo->row == NULL || pResInfo->pCol == NULL || pResInfo->length == NULL || pResInfo->convertBuf == NULL) {
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
taosMemoryFree(pResInfo->row);
|
||||||
|
taosMemoryFree(pResInfo->pCol);
|
||||||
|
taosMemoryFree(pResInfo->length);
|
||||||
|
taosMemoryFree(pResInfo->convertBuf);
|
||||||
|
return terrno;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2129,7 +2135,7 @@ static int32_t doConvertJson(SReqResultInfo* pResultInfo, int32_t numOfCols, int
|
||||||
|
|
||||||
taosMemoryFreeClear(pResultInfo->convertJson);
|
taosMemoryFreeClear(pResultInfo->convertJson);
|
||||||
pResultInfo->convertJson = taosMemoryCalloc(1, dataLen);
|
pResultInfo->convertJson = taosMemoryCalloc(1, dataLen);
|
||||||
if (pResultInfo->convertJson == NULL) return TSDB_CODE_OUT_OF_MEMORY;
|
if (pResultInfo->convertJson == NULL) return terrno;
|
||||||
char* p1 = pResultInfo->convertJson;
|
char* p1 = pResultInfo->convertJson;
|
||||||
|
|
||||||
int32_t totalLen = 0;
|
int32_t totalLen = 0;
|
||||||
|
|
|
@ -965,7 +965,7 @@ int32_t cloneCatalogReq(SCatalogReq **ppTarget, SCatalogReq *pSrc) {
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
SCatalogReq *pTarget = taosMemoryCalloc(1, sizeof(SCatalogReq));
|
SCatalogReq *pTarget = taosMemoryCalloc(1, sizeof(SCatalogReq));
|
||||||
if (pTarget == NULL) {
|
if (pTarget == NULL) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = terrno;
|
||||||
} else {
|
} else {
|
||||||
pTarget->pDbVgroup = taosArrayDup(pSrc->pDbVgroup, NULL);
|
pTarget->pDbVgroup = taosArrayDup(pSrc->pDbVgroup, NULL);
|
||||||
pTarget->pDbCfg = taosArrayDup(pSrc->pDbCfg, NULL);
|
pTarget->pDbCfg = taosArrayDup(pSrc->pDbCfg, NULL);
|
||||||
|
@ -1174,7 +1174,7 @@ int32_t createParseContext(const SRequestObj *pRequest, SParseContext **pCxt, SS
|
||||||
|
|
||||||
*pCxt = taosMemoryCalloc(1, sizeof(SParseContext));
|
*pCxt = taosMemoryCalloc(1, sizeof(SParseContext));
|
||||||
if (*pCxt == NULL) {
|
if (*pCxt == NULL) {
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
**pCxt = (SParseContext){.requestId = pRequest->requestId,
|
**pCxt = (SParseContext){.requestId = pRequest->requestId,
|
||||||
|
@ -1208,7 +1208,7 @@ int32_t prepareAndParseSqlSyntax(SSqlCallbackWrapper **ppWrapper, SRequestObj *p
|
||||||
STscObj *pTscObj = pRequest->pTscObj;
|
STscObj *pTscObj = pRequest->pTscObj;
|
||||||
SSqlCallbackWrapper *pWrapper = taosMemoryCalloc(1, sizeof(SSqlCallbackWrapper));
|
SSqlCallbackWrapper *pWrapper = taosMemoryCalloc(1, sizeof(SSqlCallbackWrapper));
|
||||||
if (pWrapper == NULL) {
|
if (pWrapper == NULL) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = terrno;
|
||||||
} else {
|
} else {
|
||||||
pWrapper->pRequest = pRequest;
|
pWrapper->pRequest = pRequest;
|
||||||
pRequest->pWrapper = pWrapper;
|
pRequest->pWrapper = pWrapper;
|
||||||
|
@ -1229,7 +1229,7 @@ int32_t prepareAndParseSqlSyntax(SSqlCallbackWrapper **ppWrapper, SRequestObj *p
|
||||||
|
|
||||||
pWrapper->pCatalogReq = taosMemoryCalloc(1, sizeof(SCatalogReq));
|
pWrapper->pCatalogReq = taosMemoryCalloc(1, sizeof(SCatalogReq));
|
||||||
if (pWrapper->pCatalogReq == NULL) {
|
if (pWrapper->pCatalogReq == NULL) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = terrno;
|
||||||
} else {
|
} else {
|
||||||
pWrapper->pCatalogReq->forceUpdate = updateMetaForce;
|
pWrapper->pCatalogReq->forceUpdate = updateMetaForce;
|
||||||
TSC_ERR_RET(qnodeRequired(pRequest, &pWrapper->pCatalogReq->qNodeRequired));
|
TSC_ERR_RET(qnodeRequired(pRequest, &pWrapper->pCatalogReq->qNodeRequired));
|
||||||
|
|
|
@ -201,7 +201,7 @@ End:
|
||||||
|
|
||||||
SMsgSendInfo* buildMsgInfoImpl(SRequestObj* pRequest) {
|
SMsgSendInfo* buildMsgInfoImpl(SRequestObj* pRequest) {
|
||||||
SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
|
SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
|
||||||
|
if(pMsgSendInfo == NULL) return pMsgSendInfo;
|
||||||
pMsgSendInfo->requestObjRefId = pRequest->self;
|
pMsgSendInfo->requestObjRefId = pRequest->self;
|
||||||
pMsgSendInfo->requestId = pRequest->requestId;
|
pMsgSendInfo->requestId = pRequest->requestId;
|
||||||
pMsgSendInfo->param = pRequest;
|
pMsgSendInfo->param = pRequest;
|
||||||
|
@ -507,7 +507,7 @@ static int32_t buildShowVariablesBlock(SArray* pVars, SSDataBlock** block) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
int32_t line = 0;
|
int32_t line = 0;
|
||||||
SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
|
SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
|
||||||
TSDB_CHECK_NULL(pBlock, code, line, END, TSDB_CODE_OUT_OF_MEMORY);
|
TSDB_CHECK_NULL(pBlock, code, line, END, terrno);
|
||||||
pBlock->info.hasVarCol = true;
|
pBlock->info.hasVarCol = true;
|
||||||
|
|
||||||
pBlock->pDataBlock = taosArrayInit(SHOW_VARIABLES_RESULT_COLS, sizeof(SColumnInfoData));
|
pBlock->pDataBlock = taosArrayInit(SHOW_VARIABLES_RESULT_COLS, sizeof(SColumnInfoData));
|
||||||
|
@ -658,7 +658,7 @@ static int32_t buildCompactDbBlock(SCompactDbRsp* pRsp, SSDataBlock** block) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
int32_t line = 0;
|
int32_t line = 0;
|
||||||
SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
|
SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
|
||||||
TSDB_CHECK_NULL(pBlock, code, line, END, TSDB_CODE_OUT_OF_MEMORY);
|
TSDB_CHECK_NULL(pBlock, code, line, END, terrno);
|
||||||
pBlock->info.hasVarCol = true;
|
pBlock->info.hasVarCol = true;
|
||||||
|
|
||||||
pBlock->pDataBlock = taosArrayInit(COMPACT_DB_RESULT_COLS, sizeof(SColumnInfoData));
|
pBlock->pDataBlock = taosArrayInit(COMPACT_DB_RESULT_COLS, sizeof(SColumnInfoData));
|
||||||
|
|
|
@ -1900,7 +1900,7 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
|
||||||
TAOS_FIELD* fields = taosMemoryCalloc(pSW->nCols, sizeof(TAOS_FIELD));
|
TAOS_FIELD* fields = taosMemoryCalloc(pSW->nCols, sizeof(TAOS_FIELD));
|
||||||
if (fields == NULL) {
|
if (fields == NULL) {
|
||||||
SET_ERROR_MSG("calloc fields failed");
|
SET_ERROR_MSG("calloc fields failed");
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = terrno;
|
||||||
goto end;
|
goto end;
|
||||||
}
|
}
|
||||||
for (int i = 0; i < pSW->nCols; i++) {
|
for (int i = 0; i < pSW->nCols; i++) {
|
||||||
|
@ -2059,7 +2059,7 @@ static int32_t encodeMqDataRsp(__encode_func__* encodeFunc, void* rspObj, tmq_ra
|
||||||
len += sizeof(int8_t) + sizeof(int32_t);
|
len += sizeof(int8_t) + sizeof(int32_t);
|
||||||
buf = taosMemoryCalloc(1, len);
|
buf = taosMemoryCalloc(1, len);
|
||||||
if (buf == NULL) {
|
if (buf == NULL) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = terrno;
|
||||||
goto FAILED;
|
goto FAILED;
|
||||||
}
|
}
|
||||||
tEncoderInit(&encoder, buf, len);
|
tEncoderInit(&encoder, buf, len);
|
||||||
|
|
|
@ -104,7 +104,7 @@ int32_t smlParseValue(SSmlKv *pVal, SSmlMsgBuf *msg) {
|
||||||
}
|
}
|
||||||
char* tmp = taosMemoryCalloc(pVal->length, 1);
|
char* tmp = taosMemoryCalloc(pVal->length, 1);
|
||||||
if (tmp == NULL){
|
if (tmp == NULL){
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return terrno;
|
||||||
}
|
}
|
||||||
(void)memcpy(tmp, pVal->value + NCHAR_ADD_LEN - 1, pVal->length - NCHAR_ADD_LEN);
|
(void)memcpy(tmp, pVal->value + NCHAR_ADD_LEN - 1, pVal->length - NCHAR_ADD_LEN);
|
||||||
code = doGeomFromText(tmp, (unsigned char **)&pVal->value, &pVal->length);
|
code = doGeomFromText(tmp, (unsigned char **)&pVal->value, &pVal->length);
|
||||||
|
|
|
@ -506,7 +506,7 @@ static int32_t doSendCommitMsg(tmq_t* tmq, int32_t vgId, SEpSet* epSet, STqOffse
|
||||||
|
|
||||||
void* buf = taosMemoryCalloc(1, sizeof(SMsgHead) + len);
|
void* buf = taosMemoryCalloc(1, sizeof(SMsgHead) + len);
|
||||||
if (buf == NULL) {
|
if (buf == NULL) {
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
((SMsgHead*)buf)->vgId = htonl(vgId);
|
((SMsgHead*)buf)->vgId = htonl(vgId);
|
||||||
|
@ -526,7 +526,7 @@ static int32_t doSendCommitMsg(tmq_t* tmq, int32_t vgId, SEpSet* epSet, STqOffse
|
||||||
SMqCommitCbParam* pParam = taosMemoryCalloc(1, sizeof(SMqCommitCbParam));
|
SMqCommitCbParam* pParam = taosMemoryCalloc(1, sizeof(SMqCommitCbParam));
|
||||||
if (pParam == NULL) {
|
if (pParam == NULL) {
|
||||||
taosMemoryFree(buf);
|
taosMemoryFree(buf);
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
pParam->params = pParamSet;
|
pParam->params = pParamSet;
|
||||||
|
@ -540,7 +540,7 @@ static int32_t doSendCommitMsg(tmq_t* tmq, int32_t vgId, SEpSet* epSet, STqOffse
|
||||||
if (pMsgSendInfo == NULL) {
|
if (pMsgSendInfo == NULL) {
|
||||||
taosMemoryFree(buf);
|
taosMemoryFree(buf);
|
||||||
taosMemoryFree(pParam);
|
taosMemoryFree(pParam);
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
pMsgSendInfo->msgInfo = (SDataBuf){.pData = buf, .len = sizeof(SMsgHead) + len, .handle = NULL};
|
pMsgSendInfo->msgInfo = (SDataBuf){.pData = buf, .len = sizeof(SMsgHead) + len, .handle = NULL};
|
||||||
|
@ -581,7 +581,7 @@ static int32_t prepareCommitCbParamSet(tmq_t* tmq, tmq_commit_cb* pCommitFp, voi
|
||||||
SMqCommitCbParamSet** ppParamSet) {
|
SMqCommitCbParamSet** ppParamSet) {
|
||||||
SMqCommitCbParamSet* pParamSet = taosMemoryCalloc(1, sizeof(SMqCommitCbParamSet));
|
SMqCommitCbParamSet* pParamSet = taosMemoryCalloc(1, sizeof(SMqCommitCbParamSet));
|
||||||
if (pParamSet == NULL) {
|
if (pParamSet == NULL) {
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
pParamSet->refId = tmq->refId;
|
pParamSet->refId = tmq->refId;
|
||||||
|
@ -1382,7 +1382,7 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
|
||||||
}
|
}
|
||||||
char* topicFName = taosMemoryCalloc(1, TSDB_TOPIC_FNAME_LEN);
|
char* topicFName = taosMemoryCalloc(1, TSDB_TOPIC_FNAME_LEN);
|
||||||
if (topicFName == NULL) {
|
if (topicFName == NULL) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = terrno;
|
||||||
goto FAIL;
|
goto FAIL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1414,7 +1414,7 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
|
||||||
|
|
||||||
sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
|
sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
|
||||||
if (sendInfo == NULL) {
|
if (sendInfo == NULL) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = terrno;
|
||||||
taosMemoryFree(buf);
|
taosMemoryFree(buf);
|
||||||
goto FAIL;
|
goto FAIL;
|
||||||
}
|
}
|
||||||
|
@ -1843,7 +1843,7 @@ void tmqBuildConsumeReqImpl(SMqPollReq* pReq, tmq_t* tmq, int64_t timeout, SMqCl
|
||||||
int32_t tmqBuildMetaRspFromWrapper(SMqPollRspWrapper* pWrapper, SMqMetaRspObj** ppRspObj) {
|
int32_t tmqBuildMetaRspFromWrapper(SMqPollRspWrapper* pWrapper, SMqMetaRspObj** ppRspObj) {
|
||||||
SMqMetaRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqMetaRspObj));
|
SMqMetaRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqMetaRspObj));
|
||||||
if (pRspObj == NULL) {
|
if (pRspObj == NULL) {
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return terrno;
|
||||||
}
|
}
|
||||||
pRspObj->resType = RES_TYPE__TMQ_META;
|
pRspObj->resType = RES_TYPE__TMQ_META;
|
||||||
tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN);
|
tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN);
|
||||||
|
@ -1858,7 +1858,7 @@ int32_t tmqBuildMetaRspFromWrapper(SMqPollRspWrapper* pWrapper, SMqMetaRspObj**
|
||||||
int32_t tmqBuildBatchMetaRspFromWrapper(SMqPollRspWrapper* pWrapper, SMqBatchMetaRspObj** ppRspObj) {
|
int32_t tmqBuildBatchMetaRspFromWrapper(SMqPollRspWrapper* pWrapper, SMqBatchMetaRspObj** ppRspObj) {
|
||||||
SMqBatchMetaRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqBatchMetaRspObj));
|
SMqBatchMetaRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqBatchMetaRspObj));
|
||||||
if (pRspObj == NULL) {
|
if (pRspObj == NULL) {
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return terrno;
|
||||||
}
|
}
|
||||||
pRspObj->common.resType = RES_TYPE__TMQ_BATCH_META;
|
pRspObj->common.resType = RES_TYPE__TMQ_BATCH_META;
|
||||||
tstrncpy(pRspObj->common.topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN);
|
tstrncpy(pRspObj->common.topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN);
|
||||||
|
@ -1969,7 +1969,7 @@ int32_t tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper, SMqClientVg* pVg, in
|
||||||
SMqRspObj** ppRspObj) {
|
SMqRspObj** ppRspObj) {
|
||||||
SMqRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqRspObj));
|
SMqRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqRspObj));
|
||||||
if (pRspObj == NULL) {
|
if (pRspObj == NULL) {
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return terrno;
|
||||||
}
|
}
|
||||||
pRspObj->common.resType = RES_TYPE__TMQ;
|
pRspObj->common.resType = RES_TYPE__TMQ;
|
||||||
(void)memcpy(&pRspObj->rsp, &pWrapper->dataRsp, sizeof(SMqDataRsp));
|
(void)memcpy(&pRspObj->rsp, &pWrapper->dataRsp, sizeof(SMqDataRsp));
|
||||||
|
@ -1982,7 +1982,7 @@ int32_t tmqBuildTaosxRspFromWrapper(SMqPollRspWrapper* pWrapper, SMqClientVg* pV
|
||||||
SMqTaosxRspObj** ppRspObj) {
|
SMqTaosxRspObj** ppRspObj) {
|
||||||
SMqTaosxRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqTaosxRspObj));
|
SMqTaosxRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqTaosxRspObj));
|
||||||
if (pRspObj == NULL) {
|
if (pRspObj == NULL) {
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return terrno;
|
||||||
}
|
}
|
||||||
pRspObj->common.resType = RES_TYPE__TMQ_METADATA;
|
pRspObj->common.resType = RES_TYPE__TMQ_METADATA;
|
||||||
(void)memcpy(&pRspObj->rsp, &pWrapper->taosxRsp, sizeof(STaosxRsp));
|
(void)memcpy(&pRspObj->rsp, &pWrapper->taosxRsp, sizeof(STaosxRsp));
|
||||||
|
@ -2008,8 +2008,7 @@ static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* p
|
||||||
|
|
||||||
msg = taosMemoryCalloc(1, msgSize);
|
msg = taosMemoryCalloc(1, msgSize);
|
||||||
if (NULL == msg) {
|
if (NULL == msg) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
return terrno;
|
||||||
return code;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (tSerializeSMqPollReq(msg, msgSize, &req) < 0) {
|
if (tSerializeSMqPollReq(msg, msgSize, &req) < 0) {
|
||||||
|
@ -2032,10 +2031,9 @@ static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* p
|
||||||
|
|
||||||
sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
|
sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
|
||||||
if (sendInfo == NULL) {
|
if (sendInfo == NULL) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
taosMemoryFreeClear(pParam);
|
taosMemoryFreeClear(pParam);
|
||||||
taosMemoryFreeClear(msg);
|
taosMemoryFreeClear(msg);
|
||||||
return code;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
sendInfo->msgInfo = (SDataBuf){.pData = msg, .len = msgSize, .handle = NULL};
|
sendInfo->msgInfo = (SDataBuf){.pData = msg, .len = msgSize, .handle = NULL};
|
||||||
|
@ -2956,7 +2954,7 @@ int32_t askEp(tmq_t* pTmq, void* param, bool sync, bool updateEpSet) {
|
||||||
pReq = taosMemoryCalloc(1, tlen);
|
pReq = taosMemoryCalloc(1, tlen);
|
||||||
if (pReq == NULL) {
|
if (pReq == NULL) {
|
||||||
tscError("consumer:0x%" PRIx64 ", failed to malloc askEpReq msg, size:%d", pTmq->consumerId, tlen);
|
tscError("consumer:0x%" PRIx64 ", failed to malloc askEpReq msg, size:%d", pTmq->consumerId, tlen);
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (tSerializeSMqAskEpReq(pReq, tlen, &req) < 0) {
|
if (tSerializeSMqAskEpReq(pReq, tlen, &req) < 0) {
|
||||||
|
@ -2969,7 +2967,7 @@ int32_t askEp(tmq_t* pTmq, void* param, bool sync, bool updateEpSet) {
|
||||||
if (pParam == NULL) {
|
if (pParam == NULL) {
|
||||||
tscError("consumer:0x%" PRIx64 ", failed to malloc subscribe param", pTmq->consumerId);
|
tscError("consumer:0x%" PRIx64 ", failed to malloc subscribe param", pTmq->consumerId);
|
||||||
taosMemoryFree(pReq);
|
taosMemoryFree(pReq);
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
pParam->refId = pTmq->refId;
|
pParam->refId = pTmq->refId;
|
||||||
|
@ -2980,7 +2978,7 @@ int32_t askEp(tmq_t* pTmq, void* param, bool sync, bool updateEpSet) {
|
||||||
if (sendInfo == NULL) {
|
if (sendInfo == NULL) {
|
||||||
taosMemoryFree(pReq);
|
taosMemoryFree(pReq);
|
||||||
taosMemoryFree(pParam);
|
taosMemoryFree(pParam);
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
sendInfo->msgInfo = (SDataBuf){.pData = pReq, .len = tlen, .handle = NULL};
|
sendInfo->msgInfo = (SDataBuf){.pData = pReq, .len = tlen, .handle = NULL};
|
||||||
|
@ -3175,7 +3173,7 @@ int64_t getCommittedFromServer(tmq_t* tmq, char* tname, int32_t vgId, SEpSet* ep
|
||||||
|
|
||||||
void* buf = taosMemoryCalloc(1, sizeof(SMsgHead) + len);
|
void* buf = taosMemoryCalloc(1, sizeof(SMsgHead) + len);
|
||||||
if (buf == NULL) {
|
if (buf == NULL) {
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
((SMsgHead*)buf)->vgId = htonl(vgId);
|
((SMsgHead*)buf)->vgId = htonl(vgId);
|
||||||
|
@ -3195,14 +3193,14 @@ int64_t getCommittedFromServer(tmq_t* tmq, char* tname, int32_t vgId, SEpSet* ep
|
||||||
SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
|
SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
|
||||||
if (sendInfo == NULL) {
|
if (sendInfo == NULL) {
|
||||||
taosMemoryFree(buf);
|
taosMemoryFree(buf);
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
SMqCommittedParam* pParam = taosMemoryMalloc(sizeof(SMqCommittedParam));
|
SMqCommittedParam* pParam = taosMemoryMalloc(sizeof(SMqCommittedParam));
|
||||||
if (pParam == NULL) {
|
if (pParam == NULL) {
|
||||||
taosMemoryFree(buf);
|
taosMemoryFree(buf);
|
||||||
taosMemoryFree(sendInfo);
|
taosMemoryFree(sendInfo);
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return terrno;
|
||||||
}
|
}
|
||||||
if (tsem2_init(&pParam->sem, 0, 0) != 0) {
|
if (tsem2_init(&pParam->sem, 0, 0) != 0) {
|
||||||
taosMemoryFree(buf);
|
taosMemoryFree(buf);
|
||||||
|
@ -3393,7 +3391,7 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a
|
||||||
if (*assignment == NULL) {
|
if (*assignment == NULL) {
|
||||||
tscError("consumer:0x%" PRIx64 " failed to malloc buffer, size:%" PRIzu, tmq->consumerId,
|
tscError("consumer:0x%" PRIx64 " failed to malloc buffer, size:%" PRIzu, tmq->consumerId,
|
||||||
(*numOfAssignment) * sizeof(tmq_topic_assignment));
|
(*numOfAssignment) * sizeof(tmq_topic_assignment));
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = terrno;
|
||||||
goto end;
|
goto end;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3421,7 +3419,7 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a
|
||||||
if (needFetch) {
|
if (needFetch) {
|
||||||
pCommon = taosMemoryCalloc(1, sizeof(SMqVgCommon));
|
pCommon = taosMemoryCalloc(1, sizeof(SMqVgCommon));
|
||||||
if (pCommon == NULL) {
|
if (pCommon == NULL) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = terrno;
|
||||||
goto end;
|
goto end;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3468,7 +3466,7 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a
|
||||||
char* msg = taosMemoryCalloc(1, msgSize);
|
char* msg = taosMemoryCalloc(1, msgSize);
|
||||||
if (NULL == msg) {
|
if (NULL == msg) {
|
||||||
taosMemoryFree(pParam);
|
taosMemoryFree(pParam);
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = terrno;
|
||||||
goto end;
|
goto end;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3483,7 +3481,7 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a
|
||||||
if (sendInfo == NULL) {
|
if (sendInfo == NULL) {
|
||||||
taosMemoryFree(pParam);
|
taosMemoryFree(pParam);
|
||||||
taosMemoryFree(msg);
|
taosMemoryFree(msg);
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = terrno;
|
||||||
goto end;
|
goto end;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3631,7 +3629,7 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_
|
||||||
|
|
||||||
char* msg = taosMemoryCalloc(1, msgSize);
|
char* msg = taosMemoryCalloc(1, msgSize);
|
||||||
if (NULL == msg) {
|
if (NULL == msg) {
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (tSerializeSMqSeekReq(msg, msgSize, &req) < 0) {
|
if (tSerializeSMqSeekReq(msg, msgSize, &req) < 0) {
|
||||||
|
@ -3642,7 +3640,7 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_
|
||||||
SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
|
SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
|
||||||
if (sendInfo == NULL) {
|
if (sendInfo == NULL) {
|
||||||
taosMemoryFree(msg);
|
taosMemoryFree(msg);
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
SMqSeekParam* pParam = taosMemoryMalloc(sizeof(SMqSeekParam));
|
SMqSeekParam* pParam = taosMemoryMalloc(sizeof(SMqSeekParam));
|
||||||
|
|
Loading…
Reference in New Issue