diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index c8fc556150..ce149921e3 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -1626,6 +1626,22 @@ void changeByteEndian(char* pData){ } } +static void tmqGetRawDataRowsPrecisionFromRes(void *pRetrieve, void** rawData, int64_t *rows, int32_t *precision){ + if(*(int64_t*)pRetrieve == 0){ + *rawData = ((SRetrieveTableRsp*)pRetrieve)->data; + *rows = htobe64(((SRetrieveTableRsp*)pRetrieve)->numOfRows); + if(precision != NULL){ + *precision = ((SRetrieveTableRsp*)pRetrieve)->precision; + } + }else if(*(int64_t*)pRetrieve == 1){ + *rawData = ((SRetrieveTableRspForTmq*)pRetrieve)->data; + *rows = htobe64(((SRetrieveTableRspForTmq*)pRetrieve)->numOfRows); + if(precision != NULL){ + *precision = ((SRetrieveTableRspForTmq*)pRetrieve)->precision; + } + } +} + static void tmqBuildRspFromWrapperInner(SMqPollRspWrapper* pWrapper, SMqClientVg* pVg, int64_t* numOfRows, SMqRspObj* pRspObj) { (*numOfRows) = 0; tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN); @@ -1648,13 +1664,7 @@ static void tmqBuildRspFromWrapperInner(SMqPollRspWrapper* pWrapper, SMqClientVg void* rawData = NULL; int64_t rows = 0; // deal with compatibility - if(*(int64_t*)pRetrieve == 0){ - rawData = ((SRetrieveTableRsp*)pRetrieve)->data; - rows = htobe64(((SRetrieveTableRsp*)pRetrieve)->numOfRows); - }else if(*(int64_t*)pRetrieve == 1){ - rawData = ((SRetrieveTableRspForTmq*)pRetrieve)->data; - rows = htobe64(((SRetrieveTableRspForTmq*)pRetrieve)->numOfRows); - } + tmqGetRawDataRowsPrecisionFromRes(pRetrieve, &rawData, &rows, NULL); pVg->numOfRows += rows; (*numOfRows) += rows; @@ -2625,18 +2635,22 @@ SReqResultInfo* tmqGetNextResInfo(TAOS_RES* res, bool convertUcs4) { pRspObj->resIter++; if (pRspObj->resIter < pRspObj->rsp.blockNum) { - SRetrieveTableRspForTmq* pRetrieveTmq = - (SRetrieveTableRspForTmq*)taosArrayGetP(pRspObj->rsp.blockData, pRspObj->resIter); if (pRspObj->rsp.withSchema) { doFreeReqResultInfo(&pRspObj->resInfo); SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(pRspObj->rsp.blockSchema, pRspObj->resIter); setResSchemaInfo(&pRspObj->resInfo, pSW->pSchema, pSW->nCols); } - pRspObj->resInfo.pData = (void*)pRetrieveTmq->data; - pRspObj->resInfo.numOfRows = htobe64(pRetrieveTmq->numOfRows); + void* pRetrieve = taosArrayGetP(pRspObj->rsp.blockData, pRspObj->resIter); + void* rawData = NULL; + int64_t rows = 0; + int32_t precision = 0; + tmqGetRawDataRowsPrecisionFromRes(pRetrieve, &rawData, &rows, &precision); + + pRspObj->resInfo.pData = rawData; + pRspObj->resInfo.numOfRows = rows; pRspObj->resInfo.current = 0; - pRspObj->resInfo.precision = pRetrieveTmq->precision; + pRspObj->resInfo.precision = precision; // TODO handle the compressed case pRspObj->resInfo.totalRows += pRspObj->resInfo.numOfRows;