Merge pull request #26742 from taosdata/opti/TD-31017-tq

fix:[TD-31017]process return value in client for tmq
This commit is contained in:
dapan1121 2024-07-25 09:32:21 +08:00 committed by GitHub
commit 3f2239ef03
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 584 additions and 375 deletions

View File

@ -4041,7 +4041,9 @@ static FORCE_INLINE void* tDecodeSMqAskEpRsp(void* buf, SMqAskEpRsp* pRsp) {
for (int32_t i = 0; i < sz; i++) { for (int32_t i = 0; i < sz; i++) {
SMqSubTopicEp topicEp; SMqSubTopicEp topicEp;
buf = tDecodeMqSubTopicEp(buf, &topicEp); buf = tDecodeMqSubTopicEp(buf, &topicEp);
taosArrayPush(pRsp->topics, &topicEp); if (taosArrayPush(pRsp->topics, &topicEp) == NULL) {
return NULL;
}
} }
return buf; return buf;
} }

View File

@ -332,8 +332,7 @@ static FORCE_INLINE SReqResultInfo* tmqGetCurResInfo(TAOS_RES* res) {
return (SReqResultInfo*)&msg->common.resInfo; return (SReqResultInfo*)&msg->common.resInfo;
} }
SReqResultInfo* tmqGetNextResInfo(TAOS_RES* res, bool convertUcs4); int32_t tmqGetNextResInfo(TAOS_RES* res, bool convertUcs4, SReqResultInfo** pResInfo);
static FORCE_INLINE SReqResultInfo* tscGetCurResInfo(TAOS_RES* res) { static FORCE_INLINE SReqResultInfo* tscGetCurResInfo(TAOS_RES* res) {
if (TD_RES_QUERY(res)) return &(((SRequestObj*)res)->body.resInfo); if (TD_RES_QUERY(res)) return &(((SRequestObj*)res)->body.resInfo);
return tmqGetCurResInfo(res); return tmqGetCurResInfo(res);

View File

@ -426,9 +426,11 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) {
return doAsyncFetchRows(pRequest, true, true); return doAsyncFetchRows(pRequest, true, true);
} else if (TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res)) { } else if (TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res)) {
SMqRspObj *msg = ((SMqRspObj *)res); SMqRspObj *msg = ((SMqRspObj *)res);
SReqResultInfo *pResultInfo; SReqResultInfo *pResultInfo = NULL;
if (msg->common.resIter == -1) { if (msg->common.resIter == -1) {
pResultInfo = tmqGetNextResInfo(res, true); if(tmqGetNextResInfo(res, true, &pResultInfo) != 0){
return NULL;
}
} else { } else {
pResultInfo = tmqGetCurResInfo(res); pResultInfo = tmqGetCurResInfo(res);
} }
@ -438,8 +440,7 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) {
pResultInfo->current += 1; pResultInfo->current += 1;
return pResultInfo->row; return pResultInfo->row;
} else { } else {
pResultInfo = tmqGetNextResInfo(res, true); if (tmqGetNextResInfo(res, true, &pResultInfo) != 0){
if (pResultInfo == NULL) {
return NULL; return NULL;
} }
@ -754,8 +755,9 @@ int taos_fetch_block_s(TAOS_RES *res, int *numOfRows, TAOS_ROW *rows) {
(*numOfRows) = pResultInfo->numOfRows; (*numOfRows) = pResultInfo->numOfRows;
return pRequest->code; return pRequest->code;
} else if (TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res)) { } else if (TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res)) {
SReqResultInfo *pResultInfo = tmqGetNextResInfo(res, true); SReqResultInfo *pResultInfo = NULL;
if (pResultInfo == NULL) return -1; int32_t code = tmqGetNextResInfo(res, true, &pResultInfo);
if (code != 0) return code;
pResultInfo->current = pResultInfo->numOfRows; pResultInfo->current = pResultInfo->numOfRows;
(*rows) = pResultInfo->row; (*rows) = pResultInfo->row;
@ -776,8 +778,9 @@ int taos_fetch_raw_block(TAOS_RES *res, int *numOfRows, void **pData) {
} }
if (TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res)) { if (TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res)) {
SReqResultInfo *pResultInfo = tmqGetNextResInfo(res, false); SReqResultInfo *pResultInfo = NULL;
if (pResultInfo == NULL) { int32_t code = tmqGetNextResInfo(res, false, &pResultInfo);
if (code != 0) {
(*numOfRows) = 0; (*numOfRows) = 0;
return 0; return 0;
} }

File diff suppressed because it is too large Load Diff

View File

@ -9248,7 +9248,7 @@ void tOffsetCopy(STqOffsetVal *pLeft, const STqOffsetVal *pRight) {
*pLeft = *pRight; *pLeft = *pRight;
if (IS_VAR_DATA_TYPE(pRight->primaryKey.type)) { if (IS_VAR_DATA_TYPE(pRight->primaryKey.type)) {
pLeft->primaryKey.pData = taosMemoryMalloc(pRight->primaryKey.nData); pLeft->primaryKey.pData = taosMemoryMalloc(pRight->primaryKey.nData);
memcpy(pLeft->primaryKey.pData, pRight->primaryKey.pData, pRight->primaryKey.nData); (void)memcpy(pLeft->primaryKey.pData, pRight->primaryKey.pData, pRight->primaryKey.nData);
} }
} }