fix:data compatibility in tmq
This commit is contained in:
parent
4722f8be87
commit
71292f45ef
|
@ -1626,6 +1626,22 @@ void changeByteEndian(char* pData){
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void tmqGetRawDataRowsPrecisionFromRes(void *pRetrieve, void** rawData, int64_t *rows, int32_t *precision){
|
||||||
|
if(*(int64_t*)pRetrieve == 0){
|
||||||
|
*rawData = ((SRetrieveTableRsp*)pRetrieve)->data;
|
||||||
|
*rows = htobe64(((SRetrieveTableRsp*)pRetrieve)->numOfRows);
|
||||||
|
if(precision != NULL){
|
||||||
|
*precision = ((SRetrieveTableRsp*)pRetrieve)->precision;
|
||||||
|
}
|
||||||
|
}else if(*(int64_t*)pRetrieve == 1){
|
||||||
|
*rawData = ((SRetrieveTableRspForTmq*)pRetrieve)->data;
|
||||||
|
*rows = htobe64(((SRetrieveTableRspForTmq*)pRetrieve)->numOfRows);
|
||||||
|
if(precision != NULL){
|
||||||
|
*precision = ((SRetrieveTableRspForTmq*)pRetrieve)->precision;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
static void tmqBuildRspFromWrapperInner(SMqPollRspWrapper* pWrapper, SMqClientVg* pVg, int64_t* numOfRows, SMqRspObj* pRspObj) {
|
static void tmqBuildRspFromWrapperInner(SMqPollRspWrapper* pWrapper, SMqClientVg* pVg, int64_t* numOfRows, SMqRspObj* pRspObj) {
|
||||||
(*numOfRows) = 0;
|
(*numOfRows) = 0;
|
||||||
tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN);
|
tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN);
|
||||||
|
@ -1648,13 +1664,7 @@ static void tmqBuildRspFromWrapperInner(SMqPollRspWrapper* pWrapper, SMqClientVg
|
||||||
void* rawData = NULL;
|
void* rawData = NULL;
|
||||||
int64_t rows = 0;
|
int64_t rows = 0;
|
||||||
// deal with compatibility
|
// deal with compatibility
|
||||||
if(*(int64_t*)pRetrieve == 0){
|
tmqGetRawDataRowsPrecisionFromRes(pRetrieve, &rawData, &rows, NULL);
|
||||||
rawData = ((SRetrieveTableRsp*)pRetrieve)->data;
|
|
||||||
rows = htobe64(((SRetrieveTableRsp*)pRetrieve)->numOfRows);
|
|
||||||
}else if(*(int64_t*)pRetrieve == 1){
|
|
||||||
rawData = ((SRetrieveTableRspForTmq*)pRetrieve)->data;
|
|
||||||
rows = htobe64(((SRetrieveTableRspForTmq*)pRetrieve)->numOfRows);
|
|
||||||
}
|
|
||||||
|
|
||||||
pVg->numOfRows += rows;
|
pVg->numOfRows += rows;
|
||||||
(*numOfRows) += rows;
|
(*numOfRows) += rows;
|
||||||
|
@ -2625,18 +2635,22 @@ SReqResultInfo* tmqGetNextResInfo(TAOS_RES* res, bool convertUcs4) {
|
||||||
pRspObj->resIter++;
|
pRspObj->resIter++;
|
||||||
|
|
||||||
if (pRspObj->resIter < pRspObj->rsp.blockNum) {
|
if (pRspObj->resIter < pRspObj->rsp.blockNum) {
|
||||||
SRetrieveTableRspForTmq* pRetrieveTmq =
|
|
||||||
(SRetrieveTableRspForTmq*)taosArrayGetP(pRspObj->rsp.blockData, pRspObj->resIter);
|
|
||||||
if (pRspObj->rsp.withSchema) {
|
if (pRspObj->rsp.withSchema) {
|
||||||
doFreeReqResultInfo(&pRspObj->resInfo);
|
doFreeReqResultInfo(&pRspObj->resInfo);
|
||||||
SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(pRspObj->rsp.blockSchema, pRspObj->resIter);
|
SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(pRspObj->rsp.blockSchema, pRspObj->resIter);
|
||||||
setResSchemaInfo(&pRspObj->resInfo, pSW->pSchema, pSW->nCols);
|
setResSchemaInfo(&pRspObj->resInfo, pSW->pSchema, pSW->nCols);
|
||||||
}
|
}
|
||||||
|
|
||||||
pRspObj->resInfo.pData = (void*)pRetrieveTmq->data;
|
void* pRetrieve = taosArrayGetP(pRspObj->rsp.blockData, pRspObj->resIter);
|
||||||
pRspObj->resInfo.numOfRows = htobe64(pRetrieveTmq->numOfRows);
|
void* rawData = NULL;
|
||||||
|
int64_t rows = 0;
|
||||||
|
int32_t precision = 0;
|
||||||
|
tmqGetRawDataRowsPrecisionFromRes(pRetrieve, &rawData, &rows, &precision);
|
||||||
|
|
||||||
|
pRspObj->resInfo.pData = rawData;
|
||||||
|
pRspObj->resInfo.numOfRows = rows;
|
||||||
pRspObj->resInfo.current = 0;
|
pRspObj->resInfo.current = 0;
|
||||||
pRspObj->resInfo.precision = pRetrieveTmq->precision;
|
pRspObj->resInfo.precision = precision;
|
||||||
|
|
||||||
// TODO handle the compressed case
|
// TODO handle the compressed case
|
||||||
pRspObj->resInfo.totalRows += pRspObj->resInfo.numOfRows;
|
pRspObj->resInfo.totalRows += pRspObj->resInfo.numOfRows;
|
||||||
|
|
Loading…
Reference in New Issue