enh(tmq): adaptive schema

This commit is contained in:
Liu Jicong 2022-05-17 17:36:01 +08:00
parent 8ceae5854d
commit d762ec6371
2 changed files with 6 additions and 2 deletions

View File

@ -234,6 +234,10 @@ static FORCE_INLINE SReqResultInfo* tmqGetNextResInfo(TAOS_RES* res, bool conver
if (msg->rsp.withSchema) {
SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(msg->rsp.blockSchema, msg->resIter);
setResSchemaInfo(&msg->resInfo, pSW->pSchema, pSW->nCols);
taosMemoryFreeClear(msg->resInfo.row);
taosMemoryFreeClear(msg->resInfo.pCol);
taosMemoryFreeClear(msg->resInfo.length);
taosMemoryFreeClear(msg->resInfo.convertBuf);
}
setQueryResultFromRsp(&msg->resInfo, pRetrieve, convertUcs4);
return &msg->resInfo;
@ -310,7 +314,7 @@ void hbMgrInitMqHbRspHandle();
SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, int32_t code, bool keepQuery, void** res);
int32_t getQueryPlan(SRequestObj* pRequest, SQuery* pQuery, SArray** pNodeList);
int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList, void** res);
int32_t refreshMeta(STscObj* pTscObj, SRequestObj* pRequest);
int32_t refreshMeta(STscObj* pTscObj, SRequestObj* pRequest);
#ifdef __cplusplus
}

View File

@ -79,7 +79,7 @@ SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) {
}
int32_t schemaLen = 0;
if (pTopic->schema.nCols) {
taosEncodeSSchemaWrapper(NULL, &pTopic->schema);
schemaLen = taosEncodeSSchemaWrapper(NULL, &pTopic->schema);
}
int32_t size =
sizeof(SMqTopicObj) + physicalPlanLen + pTopic->sqlLen + pTopic->astLen + schemaLen + MND_TOPIC_RESERVE_SIZE;