feat:remove add schemaExt to SMqDataRsp
This commit is contained in:
parent
353b3ea99b
commit
6dc98650d5
|
@ -4341,7 +4341,6 @@ typedef struct {
|
|||
};
|
||||
void* data; //for free in client, only effected if type is data or metadata. raw data not effected
|
||||
bool blockDataElementFree; // if true, free blockDataElement in blockData,(true in server, false in client)
|
||||
SArray* blockSchemaExt; // save decimal info
|
||||
} SMqDataRsp;
|
||||
|
||||
int32_t tEncodeMqDataRsp(SEncoder* pEncoder, const SMqDataRsp* pObj);
|
||||
|
|
|
@ -215,7 +215,6 @@ int32_t qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset);
|
|||
SMqBatchMetaRsp* qStreamExtractMetaMsg(qTaskInfo_t tinfo);
|
||||
|
||||
const SSchemaWrapper* qExtractSchemaFromTask(qTaskInfo_t tinfo);
|
||||
const SExtSchema* qExtractSchemaExtFromTask(qTaskInfo_t tinfo);
|
||||
|
||||
const char* qExtractTbnameFromTask(qTaskInfo_t tinfo);
|
||||
|
||||
|
|
|
@ -133,14 +133,12 @@ typedef struct SMetaTableInfo {
|
|||
int64_t suid;
|
||||
int64_t uid;
|
||||
SSchemaWrapper* schema;
|
||||
SExtSchema* pExtSchemas;
|
||||
char tbName[TSDB_TABLE_NAME_LEN];
|
||||
} SMetaTableInfo;
|
||||
|
||||
static FORCE_INLINE void destroyMetaTableInfo(SMetaTableInfo* mtInfo){
|
||||
if (mtInfo == NULL) return;
|
||||
tDeleteSchemaWrapper(mtInfo->schema);
|
||||
taosMemoryFreeClear(mtInfo->pExtSchemas);
|
||||
}
|
||||
|
||||
typedef struct SSnapContext {
|
||||
|
|
|
@ -2452,8 +2452,18 @@ int32_t setResultDataPtr(SReqResultInfo* pResultInfo, bool convertUcs4) {
|
|||
uint64_t groupId = *(uint64_t*)p;
|
||||
p += sizeof(uint64_t);
|
||||
|
||||
// type+bytes
|
||||
p += (sizeof(int32_t) + sizeof(int8_t)) * pResultInfo->numOfCols;
|
||||
// check fields
|
||||
for (int32_t i = 0; i < pResultInfo->numOfCols; ++i) {
|
||||
int8_t type = *(int8_t*)p;
|
||||
p += sizeof(int8_t);
|
||||
|
||||
int32_t bytes = *(int32_t*)p;
|
||||
p += sizeof(int32_t);
|
||||
|
||||
if (IS_DECIMAL_TYPE(type) && pResultInfo->fields[i].precision == 0) {
|
||||
extractDecimalTypeInfoFromBytes(&bytes, &pResultInfo->fields[i].precision, &pResultInfo->fields[i].scale);
|
||||
}
|
||||
}
|
||||
|
||||
int32_t* colLength = (int32_t*)p;
|
||||
p += sizeof(int32_t) * pResultInfo->numOfCols;
|
||||
|
|
|
@ -3011,10 +3011,8 @@ int32_t tmqGetNextResInfo(TAOS_RES* res, bool convertUcs4, SReqResultInfo** pRes
|
|||
if (data->withSchema) {
|
||||
doFreeReqResultInfo(&pRspObj->resInfo);
|
||||
SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(data->blockSchema, pRspObj->resIter);
|
||||
SExtSchema* pSWExt = (data->blockSchemaExt == NULL) ? NULL :
|
||||
(SExtSchema*)taosArrayGetP(data->blockSchemaExt, pRspObj->resIter);
|
||||
if (pSW) {
|
||||
TAOS_CHECK_RETURN(setResSchemaInfo(&pRspObj->resInfo, pSW->pSchema, pSW->nCols, pSWExt));
|
||||
TAOS_CHECK_RETURN(setResSchemaInfo(&pRspObj->resInfo, pSW->pSchema, pSW->nCols, NULL));
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -11674,51 +11674,11 @@ _exit:
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t tEncodeMqDataRspSchemaExt(SEncoder *pEncoder, const SMqDataRsp *pRsp){
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
if (!pRsp->withSchema){
|
||||
return 0;
|
||||
}
|
||||
for (int32_t i = 0; i < pRsp->blockNum; i++) {
|
||||
SSchemaWrapper *pSW = (SSchemaWrapper *)taosArrayGetP(pRsp->blockSchema, i);
|
||||
TSDB_CHECK_NULL(pSW, code, lino, _exit, terrno);
|
||||
SExtSchema *pSWExt = (SExtSchema *)taosArrayGetP(pRsp->blockSchemaExt, i);
|
||||
if (pSWExt != NULL){
|
||||
TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, (const uint8_t *)pSWExt, sizeof(SExtSchema) * pSW->nCols));
|
||||
}
|
||||
}
|
||||
_exit:
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t tDecodeMqDataRspSchemaExt(SDecoder *pDecoder, SMqDataRsp *pRsp) {
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
if (!pRsp->withSchema) {
|
||||
return 0;
|
||||
}
|
||||
if ((pRsp->blockSchemaExt = taosArrayInit(pRsp->blockNum, sizeof(void *))) == NULL) {
|
||||
TAOS_CHECK_EXIT(terrno);
|
||||
}
|
||||
for (int32_t i = 0; i < pRsp->blockNum; i++) {
|
||||
void * data = NULL;
|
||||
uint32_t bLen = 0;
|
||||
TAOS_CHECK_EXIT(tDecodeBinary(pDecoder, (uint8_t **)&data, &bLen));
|
||||
if (taosArrayPush(pRsp->blockSchemaExt, &data) == NULL) {
|
||||
TAOS_CHECK_EXIT(terrno);
|
||||
}
|
||||
}
|
||||
_exit:
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t tEncodeMqDataRsp(SEncoder *pEncoder, const SMqDataRsp *pRsp) {
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
TAOS_CHECK_EXIT(tEncodeMqDataRspCommon(pEncoder, pRsp));
|
||||
TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pRsp->sleepTime));
|
||||
TAOS_CHECK_EXIT(tEncodeMqDataRspSchemaExt(pEncoder, pRsp));
|
||||
|
||||
_exit:
|
||||
return code;
|
||||
|
@ -11802,9 +11762,6 @@ int32_t tDecodeMqDataRsp(SDecoder *pDecoder, SMqDataRsp *pRsp) {
|
|||
if (!tDecodeIsEnd(pDecoder)) {
|
||||
TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pRsp->sleepTime));
|
||||
}
|
||||
if (!tDecodeIsEnd(pDecoder)) {
|
||||
TAOS_CHECK_RETURN(tDecodeMqDataRspSchemaExt(pDecoder, pRsp));
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
@ -11831,12 +11788,6 @@ static void tDeleteMqDataRspCommon(SMqDataRsp *pRsp) {
|
|||
pRsp->blockData = NULL;
|
||||
taosArrayDestroyP(pRsp->blockSchema, (FDelete)tDeleteSchemaWrapper);
|
||||
pRsp->blockSchema = NULL;
|
||||
if (pRsp->blockDataElementFree){
|
||||
taosArrayDestroyP(pRsp->blockSchemaExt, NULL);
|
||||
} else {
|
||||
taosArrayDestroy(pRsp->blockSchemaExt);
|
||||
}
|
||||
pRsp->blockSchemaExt = NULL;
|
||||
taosArrayDestroyP(pRsp->blockTbName, NULL);
|
||||
pRsp->blockTbName = NULL;
|
||||
tOffsetDestroy(&pRsp->reqOffset);
|
||||
|
@ -11859,7 +11810,6 @@ int32_t tEncodeSTaosxRsp(SEncoder *pEncoder, const SMqDataRsp *pRsp) {
|
|||
TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, createTableReq, createTableLen));
|
||||
}
|
||||
}
|
||||
TAOS_CHECK_EXIT(tEncodeMqDataRspSchemaExt(pEncoder, pRsp));
|
||||
|
||||
_exit:
|
||||
return code;
|
||||
|
@ -11891,9 +11841,6 @@ int32_t tDecodeSTaosxRsp(SDecoder *pDecoder, SMqDataRsp *pRsp) {
|
|||
}
|
||||
}
|
||||
}
|
||||
if (!tDecodeIsEnd(pDecoder)) {
|
||||
TAOS_CHECK_EXIT(tDecodeMqDataRspSchemaExt(pDecoder, pRsp));
|
||||
}
|
||||
_exit:
|
||||
return code;
|
||||
}
|
||||
|
|
|
@ -108,16 +108,6 @@ struct STQ {
|
|||
SStreamMeta* pStreamMeta;
|
||||
};
|
||||
|
||||
typedef struct {
|
||||
SSchemaWrapper *pSchemaWrapper;
|
||||
SExtSchema *extSchema;
|
||||
}TQSchema;
|
||||
|
||||
static FORCE_INLINE void freeTqSchema(TQSchema* pSchema) {
|
||||
tDeleteSchemaWrapper(pSchema->pSchemaWrapper);
|
||||
taosMemoryFreeClear(pSchema->extSchema);
|
||||
}
|
||||
|
||||
int32_t tEncodeSTqHandle(SEncoder* pEncoder, const STqHandle* pHandle);
|
||||
int32_t tDecodeSTqHandle(SDecoder* pDecoder, STqHandle* pHandle);
|
||||
void tqDestroyTqHandle(void* data);
|
||||
|
|
|
@ -170,7 +170,6 @@ int metaUpdateChangeTimeWithLock(SMeta* pMeta, tb_uid_t uid, int64_t
|
|||
SSchemaWrapper* metaGetTableSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver, int lock, SExtSchema** extSchema);
|
||||
int64_t metaGetTableCreateTime(SMeta *pMeta, tb_uid_t uid, int lock);
|
||||
SExtSchema* metaGetSExtSchema(const SMetaEntry *pME);
|
||||
SExtSchema* metaCloneSExtSchema(const SExtSchema *src, int32_t nCols);
|
||||
int32_t metaGetTbTSchemaNotNull(SMeta* pMeta, tb_uid_t uid, int32_t sver, int lock, STSchema** ppTSchema);
|
||||
int32_t metaGetTbTSchemaMaybeNull(SMeta* pMeta, tb_uid_t uid, int32_t sver, int lock, STSchema** ppTSchema);
|
||||
STSchema* metaGetTbTSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver, int lock);
|
||||
|
|
|
@ -92,17 +92,6 @@ SExtSchema* metaGetSExtSchema(const SMetaEntry *pME) {
|
|||
return NULL;
|
||||
}
|
||||
|
||||
SExtSchema* metaCloneSExtSchema(const SExtSchema *src, int32_t nCols) {
|
||||
if (src == NULL || nCols <= 0) {
|
||||
return NULL;
|
||||
}
|
||||
SExtSchema* ret = taosMemoryMalloc(sizeof(SExtSchema) * nCols);
|
||||
if (ret != NULL){
|
||||
memcpy(ret, src, nCols * sizeof(SExtSchema));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int meteEncodeColCmprEntry(SEncoder *pCoder, const SMetaEntry *pME) {
|
||||
const SColCmprWrapper *pw = &pME->colCmpr;
|
||||
TAOS_CHECK_RETURN(tEncodeI32v(pCoder, pw->nCols));
|
||||
|
|
|
@ -256,7 +256,6 @@ typedef struct STableInfoForChildTable {
|
|||
char* tableName;
|
||||
SSchemaWrapper* schemaRow;
|
||||
SSchemaWrapper* tagRow;
|
||||
SExtSchema* pExtSchemas;
|
||||
} STableInfoForChildTable;
|
||||
|
||||
static void destroySTableInfoForChildTable(void* data) {
|
||||
|
@ -264,7 +263,6 @@ static void destroySTableInfoForChildTable(void* data) {
|
|||
taosMemoryFree(pData->tableName);
|
||||
tDeleteSchemaWrapper(pData->schemaRow);
|
||||
tDeleteSchemaWrapper(pData->tagRow);
|
||||
taosMemoryFree(pData->pExtSchemas);
|
||||
}
|
||||
|
||||
static int32_t MoveToSnapShotVersion(SSnapContext* ctx) {
|
||||
|
@ -338,11 +336,6 @@ static int32_t saveSuperTableInfoForChildTable(SMetaEntry* me, SHashObj* suidInf
|
|||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto END;
|
||||
}
|
||||
dataTmp.pExtSchemas = metaGetSExtSchema(me);
|
||||
if (dataTmp.pExtSchemas == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto END;
|
||||
}
|
||||
code = taosHashPut(suidInfo, &me->uid, sizeof(tb_uid_t), &dataTmp, sizeof(STableInfoForChildTable));
|
||||
if (code != 0) {
|
||||
goto END;
|
||||
|
@ -811,11 +804,9 @@ int32_t getMetaTableInfoFromSnapshot(SSnapContext* ctx, SMetaTableInfo* result)
|
|||
}
|
||||
result->suid = me.ctbEntry.suid;
|
||||
result->schema = tCloneSSchemaWrapper(data->schemaRow);
|
||||
result->pExtSchemas = metaCloneSExtSchema(data->pExtSchemas, data->schemaRow->nCols);
|
||||
} else if (ctx->subType == TOPIC_SUB_TYPE__DB && me.type == TSDB_NORMAL_TABLE) {
|
||||
result->suid = 0;
|
||||
result->schema = tCloneSSchemaWrapper(&me.ntbEntry.schemaRow);
|
||||
result->pExtSchemas = metaGetSExtSchema(&me);
|
||||
} else {
|
||||
metaDebug("tmqsnap get uid continue");
|
||||
tDecoderClear(&dc);
|
||||
|
|
|
@ -598,7 +598,7 @@ END:
|
|||
return code;
|
||||
}
|
||||
|
||||
int32_t tqMaskBlock(SSchemaWrapper* pDst, SExtSchema* extDst, SSDataBlock* pBlock, const SSchemaWrapper* pSrc, char* mask, SExtSchema* extSrc) {
|
||||
int32_t tqMaskBlock(SSchemaWrapper* pDst, SSDataBlock* pBlock, const SSchemaWrapper* pSrc, char* mask, SExtSchema* extSrc) {
|
||||
if (pDst == NULL || pBlock == NULL || pSrc == NULL || mask == NULL) {
|
||||
return TSDB_CODE_INVALID_PARA;
|
||||
}
|
||||
|
@ -622,7 +622,6 @@ int32_t tqMaskBlock(SSchemaWrapper* pDst, SExtSchema* extDst, SSDataBlock* pBloc
|
|||
SColumnInfoData colInfo =
|
||||
createColumnInfoData(pSrc->pSchema[i].type, pSrc->pSchema[i].bytes, pSrc->pSchema[i].colId);
|
||||
if (extSrc != NULL) {
|
||||
extDst[j++] = extSrc[i];
|
||||
decimalFromTypeMod(extSrc[i].typeMod, &colInfo.info.precision, &colInfo.info.scale);
|
||||
}
|
||||
code = blockDataAppendColInfo(pBlock, &colInfo);
|
||||
|
@ -658,7 +657,7 @@ static int32_t buildResSDataBlock(STqReader* pReader, SSchemaWrapper* pSchema, c
|
|||
SSchema* pColSchema = &pSchema->pSchema[i];
|
||||
SColumnInfoData colInfo = createColumnInfoData(pColSchema->type, pColSchema->bytes, pColSchema->colId);
|
||||
|
||||
if (pReader->extSchema != NULL) {
|
||||
if (IS_DECIMAL_TYPE(pColSchema->type) && pReader->extSchema != NULL) {
|
||||
decimalFromTypeMod(pReader->extSchema[i].typeMod, &colInfo.info.precision, &colInfo.info.scale);
|
||||
}
|
||||
int32_t code = blockDataAppendColInfo(pBlock, &colInfo);
|
||||
|
@ -688,6 +687,9 @@ static int32_t buildResSDataBlock(STqReader* pReader, SSchemaWrapper* pSchema, c
|
|||
j++;
|
||||
} else {
|
||||
SColumnInfoData colInfo = createColumnInfoData(pColSchema->type, pColSchema->bytes, pColSchema->colId);
|
||||
if (IS_DECIMAL_TYPE(pColSchema->type) && pReader->extSchema != NULL) {
|
||||
decimalFromTypeMod(pReader->extSchema[i].typeMod, &colInfo.info.precision, &colInfo.info.scale);
|
||||
}
|
||||
int32_t code = blockDataAppendColInfo(pBlock, &colInfo);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return -1;
|
||||
|
@ -895,7 +897,6 @@ static int32_t processBuildNew(STqReader* pReader, SSubmitTbData* pSubmitTbData,
|
|||
int32_t* lastRow) {
|
||||
int32_t code = 0;
|
||||
SSchemaWrapper* pSW = NULL;
|
||||
SExtSchema* pSWExt = NULL;
|
||||
SSDataBlock* block = NULL;
|
||||
if (taosArrayGetSize(blocks) > 0) {
|
||||
SSDataBlock* pLastBlock = taosArrayGetLast(blocks);
|
||||
|
@ -909,10 +910,8 @@ static int32_t processBuildNew(STqReader* pReader, SSubmitTbData* pSubmitTbData,
|
|||
|
||||
pSW = taosMemoryCalloc(1, sizeof(SSchemaWrapper));
|
||||
TQ_NULL_GO_TO_END(pSW);
|
||||
pSWExt = taosMemoryCalloc(1, sizeof(SExtSchema));
|
||||
TQ_NULL_GO_TO_END(pSWExt);
|
||||
|
||||
TQ_ERR_GO_TO_END(tqMaskBlock(pSW, pSWExt, block, pReader->pSchemaWrapper, assigned, pReader->extSchema));
|
||||
TQ_ERR_GO_TO_END(tqMaskBlock(pSW, block, pReader->pSchemaWrapper, assigned, pReader->extSchema));
|
||||
tqTrace("vgId:%d, build new block, col %d", pReader->pWalReader->pWal->cfg.vgId,
|
||||
(int32_t)taosArrayGetSize(block->pDataBlock));
|
||||
|
||||
|
@ -920,10 +919,8 @@ static int32_t processBuildNew(STqReader* pReader, SSubmitTbData* pSubmitTbData,
|
|||
block->info.version = pReader->msg.ver;
|
||||
TQ_ERR_GO_TO_END(blockDataEnsureCapacity(block, numOfRows - curRow));
|
||||
TQ_NULL_GO_TO_END(taosArrayPush(blocks, block));
|
||||
TQSchema schema = {pSW, pSWExt};
|
||||
TQ_NULL_GO_TO_END(taosArrayPush(schemas, &schema));
|
||||
TQ_NULL_GO_TO_END(taosArrayPush(schemas, &pSW));
|
||||
pSW = NULL;
|
||||
pSWExt = NULL;
|
||||
|
||||
taosMemoryFreeClear(block);
|
||||
|
||||
|
@ -934,7 +931,6 @@ END:
|
|||
tDeleteSchemaWrapper(pSW);
|
||||
blockDataFreeRes(block);
|
||||
taosMemoryFree(block);
|
||||
taosMemoryFree(pSWExt);
|
||||
return code;
|
||||
}
|
||||
static int32_t tqProcessColData(STqReader* pReader, SSubmitTbData* pSubmitTbData, SArray* blocks, SArray* schemas) {
|
||||
|
@ -1149,12 +1145,10 @@ int32_t tqRetrieveTaosxBlock(STqReader* pReader, SMqDataRsp* pRsp, SArray* block
|
|||
return code;
|
||||
}
|
||||
} else if (rawList != NULL) {
|
||||
TQSchema schema = {pReader->pSchemaWrapper, pReader->extSchema};
|
||||
if (taosArrayPush(schemas, &schema) == NULL){
|
||||
if (taosArrayPush(schemas, &pReader->pSchemaWrapper) == NULL){
|
||||
return terrno;
|
||||
}
|
||||
pReader->pSchemaWrapper = NULL;
|
||||
pReader->extSchema = NULL;
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
|
|
@ -238,7 +238,6 @@ int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqBat
|
|||
int32_t lino = 0;
|
||||
char* tbName = NULL;
|
||||
SSchemaWrapper* pSW = NULL;
|
||||
SExtSchema* pSWExt = NULL;
|
||||
const STqExecHandle* pExec = &pHandle->execHandle;
|
||||
qTaskInfo_t task = pExec->task;
|
||||
code = qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType);
|
||||
|
@ -267,10 +266,6 @@ int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqBat
|
|||
TSDB_CHECK_NULL(pSW, code, lino, END, terrno);
|
||||
TSDB_CHECK_NULL(taosArrayPush(pRsp->blockSchema, &pSW), code, lino, END, terrno);
|
||||
pSW = NULL;
|
||||
pSWExt = metaCloneSExtSchema(qExtractSchemaExtFromTask(task), pSW->nCols);
|
||||
TSDB_CHECK_NULL(pSWExt, code, lino, END, terrno);
|
||||
TSDB_CHECK_NULL(taosArrayPush(pRsp->blockSchemaExt, &pSWExt), code, lino, END, terrno);
|
||||
pSWExt = NULL;
|
||||
}
|
||||
|
||||
code = tqAddBlockDataToRsp(pDataBlock, pRsp, taosArrayGetSize(pDataBlock->pDataBlock),
|
||||
|
@ -319,7 +314,6 @@ END:
|
|||
tqError("%s failed at %d, vgId:%d, task exec error since %s", __FUNCTION__ , lino, pTq->pVnode->config.vgId, tstrerror(code));
|
||||
}
|
||||
tDeleteSchemaWrapper(pSW);
|
||||
taosMemoryFree(pSWExt);
|
||||
taosMemoryFree(tbName);
|
||||
return code;
|
||||
}
|
||||
|
@ -336,7 +330,7 @@ static void tqProcessSubData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, int
|
|||
|
||||
pBlocks = taosArrayInit(0, sizeof(SSDataBlock));
|
||||
TSDB_CHECK_NULL(pBlocks, code, lino, END, terrno);
|
||||
pSchemas = taosArrayInit(0, sizeof(TQSchema));
|
||||
pSchemas = taosArrayInit(0, sizeof(void*));
|
||||
TSDB_CHECK_NULL(pSchemas, code, lino, END, terrno);
|
||||
|
||||
SSubmitTbData* pSubmitTbData = NULL;
|
||||
|
@ -381,17 +375,12 @@ static void tqProcessSubData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, int
|
|||
*totalRows += pBlock->info.rows;
|
||||
}
|
||||
|
||||
TQSchema* schema = (TQSchema*)taosArrayGet(pSchemas, i);
|
||||
if (taosArrayPush(pRsp->blockSchema, &schema->pSchemaWrapper) == NULL){
|
||||
void** pSW = taosArrayGet(pSchemas, i);
|
||||
if (taosArrayPush(pRsp->blockSchema, pSW) == NULL){
|
||||
tqError("vgId:%d, failed to add schema to rsp msg", pTq->pVnode->config.vgId);
|
||||
continue;
|
||||
}
|
||||
if (taosArrayPush(pRsp->blockSchemaExt, &schema->extSchema) == NULL){
|
||||
tqError("vgId:%d, failed to add schema to rsp msg", pTq->pVnode->config.vgId);
|
||||
continue;
|
||||
}
|
||||
schema->pSchemaWrapper = NULL;
|
||||
schema->extSchema = NULL;
|
||||
*pSW = NULL;
|
||||
pRsp->blockNum++;
|
||||
}
|
||||
tqTrace("vgId:%d, process sub data success, response blocknum:%d, rows:%d", pTq->pVnode->config.vgId, pRsp->blockNum, *totalRows);
|
||||
|
@ -400,7 +389,7 @@ END:
|
|||
tqError("%s failed at %d, failed to process sub data:%s", __FUNCTION__, lino, tstrerror(code));
|
||||
}
|
||||
taosArrayDestroyEx(pBlocks, (FDelete)blockDataFreeRes);
|
||||
taosArrayDestroyP(pSchemas, (FDelete)freeTqSchema);
|
||||
taosArrayDestroyP(pSchemas, (FDelete)tDeleteSchemaWrapper);
|
||||
}
|
||||
|
||||
static void preProcessSubmitMsg(STqHandle* pHandle, const SMqPollReq* pRequest, SArray** rawList){
|
||||
|
|
|
@ -75,8 +75,6 @@ static int32_t tqInitTaosxRsp(SMqDataRsp* pRsp, STqOffsetVal pOffset) {
|
|||
pRsp->blockSchema = taosArrayInit(0, sizeof(void*));
|
||||
TSDB_CHECK_NULL(pRsp->blockSchema, code, lino, END, terrno);
|
||||
|
||||
pRsp->blockSchemaExt = taosArrayInit(0, sizeof(void*));
|
||||
TSDB_CHECK_NULL(pRsp->blockSchemaExt, code, lino, END, terrno);
|
||||
END:
|
||||
if (code != 0){
|
||||
tqError("%s failed at:%d, code:%s", __FUNCTION__ , lino, tstrerror(code));
|
||||
|
@ -84,7 +82,6 @@ END:
|
|||
taosArrayDestroy(pRsp->blockDataLen);
|
||||
taosArrayDestroy(pRsp->blockTbName);
|
||||
taosArrayDestroy(pRsp->blockSchema);
|
||||
taosArrayDestroy(pRsp->blockSchemaExt);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
|
|
@ -64,7 +64,6 @@ typedef struct {
|
|||
int8_t sourceExcluded;
|
||||
int64_t snapshotVer;
|
||||
SSchemaWrapper* schema;
|
||||
SExtSchema* pExtSchemas;
|
||||
char tbName[TSDB_TABLE_NAME_LEN]; // this is the current scan table: todo refactor
|
||||
int8_t recoverStep;
|
||||
int8_t recoverScanFinished;
|
||||
|
|
|
@ -1306,11 +1306,6 @@ const SSchemaWrapper* qExtractSchemaFromTask(qTaskInfo_t tinfo) {
|
|||
return pTaskInfo->streamInfo.schema;
|
||||
}
|
||||
|
||||
const SExtSchema* qExtractSchemaExtFromTask(qTaskInfo_t tinfo) {
|
||||
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
|
||||
return pTaskInfo->streamInfo.pExtSchemas;
|
||||
}
|
||||
|
||||
const char* qExtractTbnameFromTask(qTaskInfo_t tinfo) {
|
||||
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
|
||||
return pTaskInfo->streamInfo.tbName;
|
||||
|
@ -1601,9 +1596,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
|
|||
tstrncpy(pTaskInfo->streamInfo.tbName, mtInfo.tbName, TSDB_TABLE_NAME_LEN);
|
||||
// pTaskInfo->streamInfo.suid = mtInfo.suid == 0 ? mtInfo.uid : mtInfo.suid;
|
||||
tDeleteSchemaWrapper(pTaskInfo->streamInfo.schema);
|
||||
taosMemoryFreeClear(pTaskInfo->streamInfo.pExtSchemas);
|
||||
pTaskInfo->streamInfo.schema = mtInfo.schema;
|
||||
pTaskInfo->streamInfo.pExtSchemas = mtInfo.pExtSchemas;
|
||||
|
||||
qDebug("tmqsnap qStreamPrepareScan snapshot data uid:%" PRId64 " ts %" PRId64 " %s", mtInfo.uid, pOffset->ts, id);
|
||||
} else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_META) {
|
||||
|
|
|
@ -261,7 +261,6 @@ SSchemaWrapper* extractQueriedColumnSchema(SScanPhysiNode* pScanNode) {
|
|||
|
||||
static void cleanupStreamInfo(SStreamTaskInfo* pStreamInfo) {
|
||||
tDeleteSchemaWrapper(pStreamInfo->schema);
|
||||
taosMemoryFreeClear(pStreamInfo->pExtSchemas);
|
||||
tOffsetDestroy(&pStreamInfo->currentOffset);
|
||||
tDeleteSchemaWrapper(pStreamInfo->notifyResultSchema);
|
||||
taosMemoryFree(pStreamInfo->stbFullName);
|
||||
|
|
Loading…
Reference in New Issue