opti:[TD-28118] raw block data for tmq

This commit is contained in:
wangmm0220 2024-01-18 14:17:08 +08:00
parent 366de880a8
commit 1b85adaa3a
1 changed files with 7 additions and 4 deletions

View File

@ -1577,16 +1577,19 @@ SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper, SMqClientVg* pVg,
pRspObj->resInfo.totalRows = 0;
pRspObj->resInfo.precision = TSDB_TIME_PRECISION_MILLI;
// extract the rows in this data packet
bool needTransformSchema = !pRspObj->rsp.withSchema;
if (!pRspObj->rsp.withSchema) { // withSchema is false if subscribe subquery, true if subscribe db or stable
pRspObj->rsp.withSchema = true;
pRspObj->rsp.blockSchema = taosArrayInit(pRspObj->rsp.blockNum, sizeof(void*));
}
// extract the rows in this data packet
for (int32_t i = 0; i < pRspObj->rsp.blockNum; ++i) {
SRetrieveTableRspForTmq* pRetrieve = (SRetrieveTableRspForTmq*)taosArrayGetP(pRspObj->rsp.blockData, i);
int64_t rows = htobe64(pRetrieve->numOfRows);
pVg->numOfRows += rows;
(*numOfRows) += rows;
if (!pRspObj->rsp.withSchema) { //withSchema is false if subscribe subquery, true if subscribe db or stable
pRspObj->rsp.withSchema = true;
pRspObj->rsp.blockSchema = taosArrayInit(pRspObj->rsp.blockNum, sizeof(void *));
if (needTransformSchema) { //withSchema is false if subscribe subquery, true if subscribe db or stable
SSchemaWrapper *schema = tCloneSSchemaWrapper(&pWrapper->topicHandle->schema);
if(schema){
taosArrayPush(pRspObj->rsp.blockSchema, &schema);