feat:add schemaExt to SMqDataRsp
This commit is contained in:
parent
d5510a047d
commit
353b3ea99b
|
@ -901,6 +901,7 @@ static FORCE_INLINE void* taosDecodeSSchemaWrapper(const void* buf, SSchemaWrapp
|
||||||
}
|
}
|
||||||
|
|
||||||
static FORCE_INLINE int32_t tEncodeSSchemaWrapper(SEncoder* pEncoder, const SSchemaWrapper* pSW) {
|
static FORCE_INLINE int32_t tEncodeSSchemaWrapper(SEncoder* pEncoder, const SSchemaWrapper* pSW) {
|
||||||
|
if (pSW == NULL) {return TSDB_CODE_INVALID_PARA;}
|
||||||
TAOS_CHECK_RETURN(tEncodeI32v(pEncoder, pSW->nCols));
|
TAOS_CHECK_RETURN(tEncodeI32v(pEncoder, pSW->nCols));
|
||||||
TAOS_CHECK_RETURN(tEncodeI32v(pEncoder, pSW->version));
|
TAOS_CHECK_RETURN(tEncodeI32v(pEncoder, pSW->version));
|
||||||
for (int32_t i = 0; i < pSW->nCols; i++) {
|
for (int32_t i = 0; i < pSW->nCols; i++) {
|
||||||
|
@ -910,6 +911,7 @@ static FORCE_INLINE int32_t tEncodeSSchemaWrapper(SEncoder* pEncoder, const SSch
|
||||||
}
|
}
|
||||||
|
|
||||||
static FORCE_INLINE int32_t tDecodeSSchemaWrapper(SDecoder* pDecoder, SSchemaWrapper* pSW) {
|
static FORCE_INLINE int32_t tDecodeSSchemaWrapper(SDecoder* pDecoder, SSchemaWrapper* pSW) {
|
||||||
|
if (pSW == NULL) {return TSDB_CODE_INVALID_PARA;}
|
||||||
TAOS_CHECK_RETURN(tDecodeI32v(pDecoder, &pSW->nCols));
|
TAOS_CHECK_RETURN(tDecodeI32v(pDecoder, &pSW->nCols));
|
||||||
TAOS_CHECK_RETURN(tDecodeI32v(pDecoder, &pSW->version));
|
TAOS_CHECK_RETURN(tDecodeI32v(pDecoder, &pSW->version));
|
||||||
|
|
||||||
|
@ -4339,7 +4341,7 @@ typedef struct {
|
||||||
};
|
};
|
||||||
void* data; //for free in client, only effected if type is data or metadata. raw data not effected
|
void* data; //for free in client, only effected if type is data or metadata. raw data not effected
|
||||||
bool blockDataElementFree; // if true, free blockDataElement in blockData,(true in server, false in client)
|
bool blockDataElementFree; // if true, free blockDataElement in blockData,(true in server, false in client)
|
||||||
|
SArray* blockSchemaExt; // save decimal info
|
||||||
} SMqDataRsp;
|
} SMqDataRsp;
|
||||||
|
|
||||||
int32_t tEncodeMqDataRsp(SEncoder* pEncoder, const SMqDataRsp* pObj);
|
int32_t tEncodeMqDataRsp(SEncoder* pEncoder, const SMqDataRsp* pObj);
|
||||||
|
|
|
@ -215,6 +215,7 @@ int32_t qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset);
|
||||||
SMqBatchMetaRsp* qStreamExtractMetaMsg(qTaskInfo_t tinfo);
|
SMqBatchMetaRsp* qStreamExtractMetaMsg(qTaskInfo_t tinfo);
|
||||||
|
|
||||||
const SSchemaWrapper* qExtractSchemaFromTask(qTaskInfo_t tinfo);
|
const SSchemaWrapper* qExtractSchemaFromTask(qTaskInfo_t tinfo);
|
||||||
|
const SExtSchema* qExtractSchemaExtFromTask(qTaskInfo_t tinfo);
|
||||||
|
|
||||||
const char* qExtractTbnameFromTask(qTaskInfo_t tinfo);
|
const char* qExtractTbnameFromTask(qTaskInfo_t tinfo);
|
||||||
|
|
||||||
|
|
|
@ -133,9 +133,16 @@ typedef struct SMetaTableInfo {
|
||||||
int64_t suid;
|
int64_t suid;
|
||||||
int64_t uid;
|
int64_t uid;
|
||||||
SSchemaWrapper* schema;
|
SSchemaWrapper* schema;
|
||||||
|
SExtSchema* pExtSchemas;
|
||||||
char tbName[TSDB_TABLE_NAME_LEN];
|
char tbName[TSDB_TABLE_NAME_LEN];
|
||||||
} SMetaTableInfo;
|
} SMetaTableInfo;
|
||||||
|
|
||||||
|
static FORCE_INLINE void destroyMetaTableInfo(SMetaTableInfo* mtInfo){
|
||||||
|
if (mtInfo == NULL) return;
|
||||||
|
tDeleteSchemaWrapper(mtInfo->schema);
|
||||||
|
taosMemoryFreeClear(mtInfo->pExtSchemas);
|
||||||
|
}
|
||||||
|
|
||||||
typedef struct SSnapContext {
|
typedef struct SSnapContext {
|
||||||
struct SMeta* pMeta;
|
struct SMeta* pMeta;
|
||||||
int64_t snapVersion;
|
int64_t snapVersion;
|
||||||
|
|
|
@ -187,7 +187,7 @@ int32_t smlBuildOutputRaw(SQuery* handle, SHashObj* pVgHash);
|
||||||
int rawBlockBindRawData(SHashObj* pVgroupHash, SArray* pVgroupList, STableMeta* pTableMeta, void* data);
|
int rawBlockBindRawData(SHashObj* pVgroupHash, SArray* pVgroupList, STableMeta* pTableMeta, void* data);
|
||||||
int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreateTbReq* pCreateTb, void* fields,
|
int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreateTbReq* pCreateTb, void* fields,
|
||||||
int numFields, bool needChangeLength, char* errstr, int32_t errstrLen, bool raw);
|
int numFields, bool needChangeLength, char* errstr, int32_t errstrLen, bool raw);
|
||||||
int32_t checkSchema(SSchema* pColSchema, int8_t* fields, char* errstr, int32_t errstrLen);
|
int32_t checkSchema(SSchema* pColSchema, SSchemaExt* pColExtSchema, int8_t* fields, char* errstr, int32_t errstrLen);
|
||||||
|
|
||||||
int32_t rewriteToVnodeModifyOpStmt(SQuery* pQuery, SArray* pBufArray);
|
int32_t rewriteToVnodeModifyOpStmt(SQuery* pQuery, SArray* pBufArray);
|
||||||
int32_t serializeVgroupsCreateTableBatch(SHashObj* pVgroupHashmap, SArray** pOut);
|
int32_t serializeVgroupsCreateTableBatch(SHashObj* pVgroupHashmap, SArray** pOut);
|
||||||
|
|
|
@ -1975,10 +1975,11 @@ static bool needRefreshMeta(void* rawData, STableMeta* pTableMeta, SSchemaWrappe
|
||||||
int j = 0;
|
int j = 0;
|
||||||
for (; j < pTableMeta->tableInfo.numOfColumns; j++) {
|
for (; j < pTableMeta->tableInfo.numOfColumns; j++) {
|
||||||
SSchema* pColSchema = &pTableMeta->schema[j];
|
SSchema* pColSchema = &pTableMeta->schema[j];
|
||||||
|
SSchemaExt* pColExtSchema = &pTableMeta->schemaExt[j];
|
||||||
char* fieldName = pSW->pSchema[i].name;
|
char* fieldName = pSW->pSchema[i].name;
|
||||||
|
|
||||||
if (strcmp(pColSchema->name, fieldName) == 0) {
|
if (strcmp(pColSchema->name, fieldName) == 0) {
|
||||||
if (checkSchema(pColSchema, fields, NULL, 0) != 0){
|
if (checkSchema(pColSchema, pColExtSchema, fields, NULL, 0) != 0){
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
|
|
|
@ -3011,9 +3011,10 @@ int32_t tmqGetNextResInfo(TAOS_RES* res, bool convertUcs4, SReqResultInfo** pRes
|
||||||
if (data->withSchema) {
|
if (data->withSchema) {
|
||||||
doFreeReqResultInfo(&pRspObj->resInfo);
|
doFreeReqResultInfo(&pRspObj->resInfo);
|
||||||
SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(data->blockSchema, pRspObj->resIter);
|
SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(data->blockSchema, pRspObj->resIter);
|
||||||
|
SExtSchema* pSWExt = (data->blockSchemaExt == NULL) ? NULL :
|
||||||
|
(SExtSchema*)taosArrayGetP(data->blockSchemaExt, pRspObj->resIter);
|
||||||
if (pSW) {
|
if (pSW) {
|
||||||
// TODO wjm tmq support ext schema
|
TAOS_CHECK_RETURN(setResSchemaInfo(&pRspObj->resInfo, pSW->pSchema, pSW->nCols, pSWExt));
|
||||||
TAOS_CHECK_RETURN(setResSchemaInfo(&pRspObj->resInfo, pSW->pSchema, pSW->nCols, NULL));
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -11674,11 +11674,54 @@ _exit:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tEncodeMqDataRsp(SEncoder *pEncoder, const SMqDataRsp *pRsp) {
|
static int32_t tEncodeMqDataRspSchemaExt(SEncoder *pEncoder, const SMqDataRsp *pRsp){
|
||||||
TAOS_CHECK_RETURN(tEncodeMqDataRspCommon(pEncoder, pRsp));
|
int32_t code = 0;
|
||||||
TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pRsp->sleepTime));
|
int32_t lino = 0;
|
||||||
|
if (!pRsp->withSchema){
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
for (int32_t i = 0; i < pRsp->blockNum; i++) {
|
||||||
|
SSchemaWrapper *pSW = (SSchemaWrapper *)taosArrayGetP(pRsp->blockSchema, i);
|
||||||
|
TSDB_CHECK_NULL(pSW, code, lino, _exit, terrno);
|
||||||
|
SExtSchema *pSWExt = (SExtSchema *)taosArrayGetP(pRsp->blockSchemaExt, i);
|
||||||
|
if (pSWExt != NULL){
|
||||||
|
TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, (const uint8_t *)pSWExt, sizeof(SExtSchema) * pSW->nCols));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
_exit:
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
return 0;
|
static int32_t tDecodeMqDataRspSchemaExt(SDecoder *pDecoder, SMqDataRsp *pRsp) {
|
||||||
|
int32_t code = 0;
|
||||||
|
int32_t lino = 0;
|
||||||
|
if (!pRsp->withSchema) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
if ((pRsp->blockSchemaExt = taosArrayInit(pRsp->blockNum, sizeof(void *))) == NULL) {
|
||||||
|
TAOS_CHECK_EXIT(terrno);
|
||||||
|
}
|
||||||
|
for (int32_t i = 0; i < pRsp->blockNum; i++) {
|
||||||
|
void * data = NULL;
|
||||||
|
uint32_t bLen = 0;
|
||||||
|
TAOS_CHECK_EXIT(tDecodeBinary(pDecoder, (uint8_t **)&data, &bLen));
|
||||||
|
if (taosArrayPush(pRsp->blockSchemaExt, &data) == NULL) {
|
||||||
|
TAOS_CHECK_EXIT(terrno);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
_exit:
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tEncodeMqDataRsp(SEncoder *pEncoder, const SMqDataRsp *pRsp) {
|
||||||
|
int32_t code = 0;
|
||||||
|
int32_t lino = 0;
|
||||||
|
TAOS_CHECK_EXIT(tEncodeMqDataRspCommon(pEncoder, pRsp));
|
||||||
|
TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pRsp->sleepTime));
|
||||||
|
TAOS_CHECK_EXIT(tEncodeMqDataRspSchemaExt(pEncoder, pRsp));
|
||||||
|
|
||||||
|
_exit:
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tDecodeMqDataRspCommon(SDecoder *pDecoder, SMqDataRsp *pRsp) {
|
int32_t tDecodeMqDataRspCommon(SDecoder *pDecoder, SMqDataRsp *pRsp) {
|
||||||
|
@ -11759,6 +11802,9 @@ int32_t tDecodeMqDataRsp(SDecoder *pDecoder, SMqDataRsp *pRsp) {
|
||||||
if (!tDecodeIsEnd(pDecoder)) {
|
if (!tDecodeIsEnd(pDecoder)) {
|
||||||
TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pRsp->sleepTime));
|
TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pRsp->sleepTime));
|
||||||
}
|
}
|
||||||
|
if (!tDecodeIsEnd(pDecoder)) {
|
||||||
|
TAOS_CHECK_RETURN(tDecodeMqDataRspSchemaExt(pDecoder, pRsp));
|
||||||
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -11785,6 +11831,12 @@ static void tDeleteMqDataRspCommon(SMqDataRsp *pRsp) {
|
||||||
pRsp->blockData = NULL;
|
pRsp->blockData = NULL;
|
||||||
taosArrayDestroyP(pRsp->blockSchema, (FDelete)tDeleteSchemaWrapper);
|
taosArrayDestroyP(pRsp->blockSchema, (FDelete)tDeleteSchemaWrapper);
|
||||||
pRsp->blockSchema = NULL;
|
pRsp->blockSchema = NULL;
|
||||||
|
if (pRsp->blockDataElementFree){
|
||||||
|
taosArrayDestroyP(pRsp->blockSchemaExt, NULL);
|
||||||
|
} else {
|
||||||
|
taosArrayDestroy(pRsp->blockSchemaExt);
|
||||||
|
}
|
||||||
|
pRsp->blockSchemaExt = NULL;
|
||||||
taosArrayDestroyP(pRsp->blockTbName, NULL);
|
taosArrayDestroyP(pRsp->blockTbName, NULL);
|
||||||
pRsp->blockTbName = NULL;
|
pRsp->blockTbName = NULL;
|
||||||
tOffsetDestroy(&pRsp->reqOffset);
|
tOffsetDestroy(&pRsp->reqOffset);
|
||||||
|
@ -11796,7 +11848,7 @@ void tDeleteMqDataRsp(SMqDataRsp *rsp) { tDeleteMqDataRspCommon(rsp); }
|
||||||
|
|
||||||
int32_t tEncodeSTaosxRsp(SEncoder *pEncoder, const SMqDataRsp *pRsp) {
|
int32_t tEncodeSTaosxRsp(SEncoder *pEncoder, const SMqDataRsp *pRsp) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
int32_t lino;
|
int32_t lino = 0;
|
||||||
|
|
||||||
TAOS_CHECK_EXIT(tEncodeMqDataRspCommon(pEncoder, pRsp));
|
TAOS_CHECK_EXIT(tEncodeMqDataRspCommon(pEncoder, pRsp));
|
||||||
TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->createTableNum));
|
TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->createTableNum));
|
||||||
|
@ -11807,6 +11859,7 @@ int32_t tEncodeSTaosxRsp(SEncoder *pEncoder, const SMqDataRsp *pRsp) {
|
||||||
TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, createTableReq, createTableLen));
|
TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, createTableReq, createTableLen));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
TAOS_CHECK_EXIT(tEncodeMqDataRspSchemaExt(pEncoder, pRsp));
|
||||||
|
|
||||||
_exit:
|
_exit:
|
||||||
return code;
|
return code;
|
||||||
|
@ -11838,7 +11891,9 @@ int32_t tDecodeSTaosxRsp(SDecoder *pDecoder, SMqDataRsp *pRsp) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if (!tDecodeIsEnd(pDecoder)) {
|
||||||
|
TAOS_CHECK_EXIT(tDecodeMqDataRspSchemaExt(pDecoder, pRsp));
|
||||||
|
}
|
||||||
_exit:
|
_exit:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
|
@ -108,6 +108,16 @@ struct STQ {
|
||||||
SStreamMeta* pStreamMeta;
|
SStreamMeta* pStreamMeta;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
SSchemaWrapper *pSchemaWrapper;
|
||||||
|
SExtSchema *extSchema;
|
||||||
|
}TQSchema;
|
||||||
|
|
||||||
|
static FORCE_INLINE void freeTqSchema(TQSchema* pSchema) {
|
||||||
|
tDeleteSchemaWrapper(pSchema->pSchemaWrapper);
|
||||||
|
taosMemoryFreeClear(pSchema->extSchema);
|
||||||
|
}
|
||||||
|
|
||||||
int32_t tEncodeSTqHandle(SEncoder* pEncoder, const STqHandle* pHandle);
|
int32_t tEncodeSTqHandle(SEncoder* pEncoder, const STqHandle* pHandle);
|
||||||
int32_t tDecodeSTqHandle(SDecoder* pDecoder, STqHandle* pHandle);
|
int32_t tDecodeSTqHandle(SDecoder* pDecoder, STqHandle* pHandle);
|
||||||
void tqDestroyTqHandle(void* data);
|
void tqDestroyTqHandle(void* data);
|
||||||
|
|
|
@ -167,10 +167,10 @@ int32_t metaDropMultipleTables(SMeta* pMeta, int64_t version, SArray* tb
|
||||||
int metaTtlFindExpired(SMeta* pMeta, int64_t timePointMs, SArray* tbUids, int32_t ttlDropMaxCount);
|
int metaTtlFindExpired(SMeta* pMeta, int64_t timePointMs, SArray* tbUids, int32_t ttlDropMaxCount);
|
||||||
int metaAlterTable(SMeta* pMeta, int64_t version, SVAlterTbReq* pReq, STableMetaRsp* pMetaRsp);
|
int metaAlterTable(SMeta* pMeta, int64_t version, SVAlterTbReq* pReq, STableMetaRsp* pMetaRsp);
|
||||||
int metaUpdateChangeTimeWithLock(SMeta* pMeta, tb_uid_t uid, int64_t changeTimeMs);
|
int metaUpdateChangeTimeWithLock(SMeta* pMeta, tb_uid_t uid, int64_t changeTimeMs);
|
||||||
int64_t metaGetTableCreateTime(SMeta* pMeta, tb_uid_t uid, int lock);
|
SSchemaWrapper* metaGetTableSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver, int lock, SExtSchema** extSchema);
|
||||||
SSchemaWrapper* metaGetTableSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver, int lock, int64_t* createTime, SExtSchema** extSchema);
|
int64_t metaGetTableCreateTime(SMeta *pMeta, tb_uid_t uid, int lock);
|
||||||
SExtSchema* metaCloneSExtSchema(const SMetaEntry *pME);
|
SExtSchema* metaGetSExtSchema(const SMetaEntry *pME);
|
||||||
void metaFreeSExtSchema(SExtSchema *p);
|
SExtSchema* metaCloneSExtSchema(const SExtSchema *src, int32_t nCols);
|
||||||
int32_t metaGetTbTSchemaNotNull(SMeta* pMeta, tb_uid_t uid, int32_t sver, int lock, STSchema** ppTSchema);
|
int32_t metaGetTbTSchemaNotNull(SMeta* pMeta, tb_uid_t uid, int32_t sver, int lock, STSchema** ppTSchema);
|
||||||
int32_t metaGetTbTSchemaMaybeNull(SMeta* pMeta, tb_uid_t uid, int32_t sver, int lock, STSchema** ppTSchema);
|
int32_t metaGetTbTSchemaMaybeNull(SMeta* pMeta, tb_uid_t uid, int32_t sver, int lock, STSchema** ppTSchema);
|
||||||
STSchema* metaGetTbTSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver, int lock);
|
STSchema* metaGetTbTSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver, int lock);
|
||||||
|
|
|
@ -70,7 +70,7 @@ static int32_t metaDecodeExtSchemas(SDecoder* pDecoder, SMetaEntry* pME) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
SExtSchema* metaCloneSExtSchema(const SMetaEntry *pME) {
|
SExtSchema* metaGetSExtSchema(const SMetaEntry *pME) {
|
||||||
const SSchemaWrapper *pSchWrapper = NULL;
|
const SSchemaWrapper *pSchWrapper = NULL;
|
||||||
bool hasTypeMods = false;
|
bool hasTypeMods = false;
|
||||||
if (pME->type == TSDB_SUPER_TABLE) {
|
if (pME->type == TSDB_SUPER_TABLE) {
|
||||||
|
@ -84,16 +84,23 @@ SExtSchema* metaCloneSExtSchema(const SMetaEntry *pME) {
|
||||||
|
|
||||||
if (hasTypeMods) {
|
if (hasTypeMods) {
|
||||||
SExtSchema* ret = taosMemoryMalloc(sizeof(SExtSchema) * pSchWrapper->nCols);
|
SExtSchema* ret = taosMemoryMalloc(sizeof(SExtSchema) * pSchWrapper->nCols);
|
||||||
memcpy(ret, pME->pExtSchemas, pSchWrapper->nCols * sizeof(SExtSchema));
|
if (ret != NULL){
|
||||||
|
memcpy(ret, pME->pExtSchemas, pSchWrapper->nCols * sizeof(SExtSchema));
|
||||||
|
}
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
void metaFreeSExtSchema(SExtSchema *p) {
|
SExtSchema* metaCloneSExtSchema(const SExtSchema *src, int32_t nCols) {
|
||||||
if (p) {
|
if (src == NULL || nCols <= 0) {
|
||||||
taosMemoryFreeClear(p);
|
return NULL;
|
||||||
}
|
}
|
||||||
|
SExtSchema* ret = taosMemoryMalloc(sizeof(SExtSchema) * nCols);
|
||||||
|
if (ret != NULL){
|
||||||
|
memcpy(ret, src, nCols * sizeof(SExtSchema));
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
int meteEncodeColCmprEntry(SEncoder *pCoder, const SMetaEntry *pME) {
|
int meteEncodeColCmprEntry(SEncoder *pCoder, const SMetaEntry *pME) {
|
||||||
|
|
|
@ -378,7 +378,7 @@ int32_t metaTbCursorPrev(SMTbCursor *pTbCur, ETableType jumpTableType) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, int lock, int64_t *createTime, SExtSchema** extSchema) {
|
SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, int lock, SExtSchema** extSchema) {
|
||||||
void *pData = NULL;
|
void *pData = NULL;
|
||||||
int nData = 0;
|
int nData = 0;
|
||||||
int64_t version;
|
int64_t version;
|
||||||
|
@ -409,7 +409,7 @@ _query:
|
||||||
if (me.type == TSDB_SUPER_TABLE) {
|
if (me.type == TSDB_SUPER_TABLE) {
|
||||||
if (sver == -1 || sver == me.stbEntry.schemaRow.version) {
|
if (sver == -1 || sver == me.stbEntry.schemaRow.version) {
|
||||||
pSchema = tCloneSSchemaWrapper(&me.stbEntry.schemaRow);
|
pSchema = tCloneSSchemaWrapper(&me.stbEntry.schemaRow);
|
||||||
if (extSchema != NULL) *extSchema = metaCloneSExtSchema(&me);
|
if (extSchema != NULL) *extSchema = metaGetSExtSchema(&me);
|
||||||
tDecoderClear(&dc);
|
tDecoderClear(&dc);
|
||||||
goto _exit;
|
goto _exit;
|
||||||
}
|
}
|
||||||
|
@ -420,7 +420,7 @@ _query:
|
||||||
} else {
|
} else {
|
||||||
if (sver == -1 || sver == me.ntbEntry.schemaRow.version) {
|
if (sver == -1 || sver == me.ntbEntry.schemaRow.version) {
|
||||||
pSchema = tCloneSSchemaWrapper(&me.ntbEntry.schemaRow);
|
pSchema = tCloneSSchemaWrapper(&me.ntbEntry.schemaRow);
|
||||||
if (extSchema != NULL) *extSchema = metaCloneSExtSchema(&me);
|
if (extSchema != NULL) *extSchema = metaGetSExtSchema(&me);
|
||||||
tDecoderClear(&dc);
|
tDecoderClear(&dc);
|
||||||
goto _exit;
|
goto _exit;
|
||||||
}
|
}
|
||||||
|
@ -437,7 +437,7 @@ _query:
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
pSchema = tCloneSSchemaWrapper(&schema);
|
pSchema = tCloneSSchemaWrapper(&schema);
|
||||||
if (extSchema != NULL) *extSchema = metaCloneSExtSchema(&me);
|
if (extSchema != NULL) *extSchema = metaGetSExtSchema(&me);
|
||||||
tDecoderClear(&dc);
|
tDecoderClear(&dc);
|
||||||
|
|
||||||
_exit:
|
_exit:
|
||||||
|
@ -667,7 +667,7 @@ STSchema *metaGetTbTSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, int lock) {
|
||||||
STSchema *pTSchema = NULL;
|
STSchema *pTSchema = NULL;
|
||||||
SSchemaWrapper *pSW = NULL;
|
SSchemaWrapper *pSW = NULL;
|
||||||
|
|
||||||
pSW = metaGetTableSchema(pMeta, uid, sver, lock, NULL, NULL);
|
pSW = metaGetTableSchema(pMeta, uid, sver, lock, NULL);
|
||||||
if (!pSW) return NULL;
|
if (!pSW) return NULL;
|
||||||
|
|
||||||
pTSchema = tBuildTSchema(pSW->pSchema, pSW->nCols, pSW->version);
|
pTSchema = tBuildTSchema(pSW->pSchema, pSW->nCols, pSW->version);
|
||||||
|
|
|
@ -256,6 +256,7 @@ typedef struct STableInfoForChildTable {
|
||||||
char* tableName;
|
char* tableName;
|
||||||
SSchemaWrapper* schemaRow;
|
SSchemaWrapper* schemaRow;
|
||||||
SSchemaWrapper* tagRow;
|
SSchemaWrapper* tagRow;
|
||||||
|
SExtSchema* pExtSchemas;
|
||||||
} STableInfoForChildTable;
|
} STableInfoForChildTable;
|
||||||
|
|
||||||
static void destroySTableInfoForChildTable(void* data) {
|
static void destroySTableInfoForChildTable(void* data) {
|
||||||
|
@ -263,6 +264,7 @@ static void destroySTableInfoForChildTable(void* data) {
|
||||||
taosMemoryFree(pData->tableName);
|
taosMemoryFree(pData->tableName);
|
||||||
tDeleteSchemaWrapper(pData->schemaRow);
|
tDeleteSchemaWrapper(pData->schemaRow);
|
||||||
tDeleteSchemaWrapper(pData->tagRow);
|
tDeleteSchemaWrapper(pData->tagRow);
|
||||||
|
taosMemoryFree(pData->pExtSchemas);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t MoveToSnapShotVersion(SSnapContext* ctx) {
|
static int32_t MoveToSnapShotVersion(SSnapContext* ctx) {
|
||||||
|
@ -336,6 +338,11 @@ static int32_t saveSuperTableInfoForChildTable(SMetaEntry* me, SHashObj* suidInf
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
goto END;
|
goto END;
|
||||||
}
|
}
|
||||||
|
dataTmp.pExtSchemas = metaGetSExtSchema(me);
|
||||||
|
if (dataTmp.pExtSchemas == NULL) {
|
||||||
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
goto END;
|
||||||
|
}
|
||||||
code = taosHashPut(suidInfo, &me->uid, sizeof(tb_uid_t), &dataTmp, sizeof(STableInfoForChildTable));
|
code = taosHashPut(suidInfo, &me->uid, sizeof(tb_uid_t), &dataTmp, sizeof(STableInfoForChildTable));
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
goto END;
|
goto END;
|
||||||
|
@ -593,7 +600,7 @@ int32_t setForSnapShot(SSnapContext* ctx, int64_t uid) {
|
||||||
|
|
||||||
void taosXSetTablePrimaryKey(SSnapContext* ctx, int64_t uid) {
|
void taosXSetTablePrimaryKey(SSnapContext* ctx, int64_t uid) {
|
||||||
bool ret = false;
|
bool ret = false;
|
||||||
SSchemaWrapper* schema = metaGetTableSchema(ctx->pMeta, uid, -1, 1, NULL, NULL);
|
SSchemaWrapper* schema = metaGetTableSchema(ctx->pMeta, uid, -1, 1, NULL);
|
||||||
if (schema && schema->nCols >= 2 && schema->pSchema[1].flags & COL_IS_KEY) {
|
if (schema && schema->nCols >= 2 && schema->pSchema[1].flags & COL_IS_KEY) {
|
||||||
ret = true;
|
ret = true;
|
||||||
}
|
}
|
||||||
|
@ -804,9 +811,11 @@ int32_t getMetaTableInfoFromSnapshot(SSnapContext* ctx, SMetaTableInfo* result)
|
||||||
}
|
}
|
||||||
result->suid = me.ctbEntry.suid;
|
result->suid = me.ctbEntry.suid;
|
||||||
result->schema = tCloneSSchemaWrapper(data->schemaRow);
|
result->schema = tCloneSSchemaWrapper(data->schemaRow);
|
||||||
|
result->pExtSchemas = metaCloneSExtSchema(data->pExtSchemas, data->schemaRow->nCols);
|
||||||
} else if (ctx->subType == TOPIC_SUB_TYPE__DB && me.type == TSDB_NORMAL_TABLE) {
|
} else if (ctx->subType == TOPIC_SUB_TYPE__DB && me.type == TSDB_NORMAL_TABLE) {
|
||||||
result->suid = 0;
|
result->suid = 0;
|
||||||
result->schema = tCloneSSchemaWrapper(&me.ntbEntry.schemaRow);
|
result->schema = tCloneSSchemaWrapper(&me.ntbEntry.schemaRow);
|
||||||
|
result->pExtSchemas = metaGetSExtSchema(&me);
|
||||||
} else {
|
} else {
|
||||||
metaDebug("tmqsnap get uid continue");
|
metaDebug("tmqsnap get uid continue");
|
||||||
tDecoderClear(&dc);
|
tDecoderClear(&dc);
|
||||||
|
|
|
@ -283,7 +283,7 @@ void tqSetTablePrimaryKey(STqReader* pReader, int64_t uid) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
bool ret = false;
|
bool ret = false;
|
||||||
SSchemaWrapper* schema = metaGetTableSchema(pReader->pVnodeMeta, uid, -1, 1, NULL, NULL);
|
SSchemaWrapper* schema = metaGetTableSchema(pReader->pVnodeMeta, uid, -1, 1, NULL);
|
||||||
if (schema && schema->nCols >= 2 && schema->pSchema[1].flags & COL_IS_KEY) {
|
if (schema && schema->nCols >= 2 && schema->pSchema[1].flags & COL_IS_KEY) {
|
||||||
ret = true;
|
ret = true;
|
||||||
}
|
}
|
||||||
|
@ -336,7 +336,7 @@ void tqReaderClose(STqReader* pReader) {
|
||||||
tDeleteSchemaWrapper(pReader->pSchemaWrapper);
|
tDeleteSchemaWrapper(pReader->pSchemaWrapper);
|
||||||
}
|
}
|
||||||
|
|
||||||
metaFreeSExtSchema(pReader->extSchema);
|
taosMemoryFree(pReader->extSchema);
|
||||||
if (pReader->pColIdList) {
|
if (pReader->pColIdList) {
|
||||||
taosArrayDestroy(pReader->pColIdList);
|
taosArrayDestroy(pReader->pColIdList);
|
||||||
}
|
}
|
||||||
|
@ -598,7 +598,7 @@ END:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tqMaskBlock(SSchemaWrapper* pDst, SSDataBlock* pBlock, const SSchemaWrapper* pSrc, char* mask, SExtSchema* extSchema) {
|
int32_t tqMaskBlock(SSchemaWrapper* pDst, SExtSchema* extDst, SSDataBlock* pBlock, const SSchemaWrapper* pSrc, char* mask, SExtSchema* extSrc) {
|
||||||
if (pDst == NULL || pBlock == NULL || pSrc == NULL || mask == NULL) {
|
if (pDst == NULL || pBlock == NULL || pSrc == NULL || mask == NULL) {
|
||||||
return TSDB_CODE_INVALID_PARA;
|
return TSDB_CODE_INVALID_PARA;
|
||||||
}
|
}
|
||||||
|
@ -621,8 +621,9 @@ int32_t tqMaskBlock(SSchemaWrapper* pDst, SSDataBlock* pBlock, const SSchemaWrap
|
||||||
pDst->pSchema[j++] = pSrc->pSchema[i];
|
pDst->pSchema[j++] = pSrc->pSchema[i];
|
||||||
SColumnInfoData colInfo =
|
SColumnInfoData colInfo =
|
||||||
createColumnInfoData(pSrc->pSchema[i].type, pSrc->pSchema[i].bytes, pSrc->pSchema[i].colId);
|
createColumnInfoData(pSrc->pSchema[i].type, pSrc->pSchema[i].bytes, pSrc->pSchema[i].colId);
|
||||||
if (extSchema != NULL) {
|
if (extSrc != NULL) {
|
||||||
decimalFromTypeMod(extSchema[i].typeMod, &colInfo.info.precision, &colInfo.info.scale);
|
extDst[j++] = extSrc[i];
|
||||||
|
decimalFromTypeMod(extSrc[i].typeMod, &colInfo.info.precision, &colInfo.info.scale);
|
||||||
}
|
}
|
||||||
code = blockDataAppendColInfo(pBlock, &colInfo);
|
code = blockDataAppendColInfo(pBlock, &colInfo);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
|
@ -748,8 +749,8 @@ int32_t tqRetrieveDataBlock(STqReader* pReader, SSDataBlock** pRes, const char*
|
||||||
if ((suid != 0 && pReader->cachedSchemaSuid != suid) || (suid == 0 && pReader->cachedSchemaUid != uid) ||
|
if ((suid != 0 && pReader->cachedSchemaSuid != suid) || (suid == 0 && pReader->cachedSchemaUid != uid) ||
|
||||||
(pReader->cachedSchemaVer != sversion)) {
|
(pReader->cachedSchemaVer != sversion)) {
|
||||||
tDeleteSchemaWrapper(pReader->pSchemaWrapper);
|
tDeleteSchemaWrapper(pReader->pSchemaWrapper);
|
||||||
metaFreeSExtSchema(pReader->extSchema);
|
taosMemoryFree(pReader->extSchema);
|
||||||
pReader->pSchemaWrapper = metaGetTableSchema(pReader->pVnodeMeta, uid, sversion, 1, NULL, &pReader->extSchema);
|
pReader->pSchemaWrapper = metaGetTableSchema(pReader->pVnodeMeta, uid, sversion, 1, &pReader->extSchema);
|
||||||
if (pReader->pSchemaWrapper == NULL) {
|
if (pReader->pSchemaWrapper == NULL) {
|
||||||
tqWarn("vgId:%d, cannot found schema wrapper for table: suid:%" PRId64 ", uid:%" PRId64
|
tqWarn("vgId:%d, cannot found schema wrapper for table: suid:%" PRId64 ", uid:%" PRId64
|
||||||
"version %d, possibly dropped table",
|
"version %d, possibly dropped table",
|
||||||
|
@ -890,10 +891,11 @@ END:
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t processBuildNew(STqReader* pReader, SSubmitTbData* pSubmitTbData, SArray* blocks, SArray* schemas,
|
static int32_t processBuildNew(STqReader* pReader, SSubmitTbData* pSubmitTbData, SArray* blocks, SArray* schemas,
|
||||||
SSchemaWrapper* pSchemaWrapper, char* assigned, int32_t numOfRows, int32_t curRow,
|
char* assigned, int32_t numOfRows, int32_t curRow,
|
||||||
int32_t* lastRow) {
|
int32_t* lastRow) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
SSchemaWrapper* pSW = NULL;
|
SSchemaWrapper* pSW = NULL;
|
||||||
|
SExtSchema* pSWExt = NULL;
|
||||||
SSDataBlock* block = NULL;
|
SSDataBlock* block = NULL;
|
||||||
if (taosArrayGetSize(blocks) > 0) {
|
if (taosArrayGetSize(blocks) > 0) {
|
||||||
SSDataBlock* pLastBlock = taosArrayGetLast(blocks);
|
SSDataBlock* pLastBlock = taosArrayGetLast(blocks);
|
||||||
|
@ -907,8 +909,10 @@ static int32_t processBuildNew(STqReader* pReader, SSubmitTbData* pSubmitTbData,
|
||||||
|
|
||||||
pSW = taosMemoryCalloc(1, sizeof(SSchemaWrapper));
|
pSW = taosMemoryCalloc(1, sizeof(SSchemaWrapper));
|
||||||
TQ_NULL_GO_TO_END(pSW);
|
TQ_NULL_GO_TO_END(pSW);
|
||||||
|
pSWExt = taosMemoryCalloc(1, sizeof(SExtSchema));
|
||||||
|
TQ_NULL_GO_TO_END(pSWExt);
|
||||||
|
|
||||||
TQ_ERR_GO_TO_END(tqMaskBlock(pSW, block, pSchemaWrapper, assigned, pReader->extSchema));
|
TQ_ERR_GO_TO_END(tqMaskBlock(pSW, pSWExt, block, pReader->pSchemaWrapper, assigned, pReader->extSchema));
|
||||||
tqTrace("vgId:%d, build new block, col %d", pReader->pWalReader->pWal->cfg.vgId,
|
tqTrace("vgId:%d, build new block, col %d", pReader->pWalReader->pWal->cfg.vgId,
|
||||||
(int32_t)taosArrayGetSize(block->pDataBlock));
|
(int32_t)taosArrayGetSize(block->pDataBlock));
|
||||||
|
|
||||||
|
@ -916,8 +920,11 @@ static int32_t processBuildNew(STqReader* pReader, SSubmitTbData* pSubmitTbData,
|
||||||
block->info.version = pReader->msg.ver;
|
block->info.version = pReader->msg.ver;
|
||||||
TQ_ERR_GO_TO_END(blockDataEnsureCapacity(block, numOfRows - curRow));
|
TQ_ERR_GO_TO_END(blockDataEnsureCapacity(block, numOfRows - curRow));
|
||||||
TQ_NULL_GO_TO_END(taosArrayPush(blocks, block));
|
TQ_NULL_GO_TO_END(taosArrayPush(blocks, block));
|
||||||
TQ_NULL_GO_TO_END(taosArrayPush(schemas, &pSW));
|
TQSchema schema = {pSW, pSWExt};
|
||||||
|
TQ_NULL_GO_TO_END(taosArrayPush(schemas, &schema));
|
||||||
pSW = NULL;
|
pSW = NULL;
|
||||||
|
pSWExt = NULL;
|
||||||
|
|
||||||
taosMemoryFreeClear(block);
|
taosMemoryFreeClear(block);
|
||||||
|
|
||||||
END:
|
END:
|
||||||
|
@ -927,6 +934,7 @@ END:
|
||||||
tDeleteSchemaWrapper(pSW);
|
tDeleteSchemaWrapper(pSW);
|
||||||
blockDataFreeRes(block);
|
blockDataFreeRes(block);
|
||||||
taosMemoryFree(block);
|
taosMemoryFree(block);
|
||||||
|
taosMemoryFree(pSWExt);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
static int32_t tqProcessColData(STqReader* pReader, SSubmitTbData* pSubmitTbData, SArray* blocks, SArray* schemas) {
|
static int32_t tqProcessColData(STqReader* pReader, SSubmitTbData* pSubmitTbData, SArray* blocks, SArray* schemas) {
|
||||||
|
@ -956,7 +964,7 @@ static int32_t tqProcessColData(STqReader* pReader, SSubmitTbData* pSubmitTbData
|
||||||
}
|
}
|
||||||
|
|
||||||
if (buildNew) {
|
if (buildNew) {
|
||||||
TQ_ERR_GO_TO_END(processBuildNew(pReader, pSubmitTbData, blocks, schemas, pSchemaWrapper, assigned, numOfRows,
|
TQ_ERR_GO_TO_END(processBuildNew(pReader, pSubmitTbData, blocks, schemas, assigned, numOfRows,
|
||||||
curRow, &lastRow));
|
curRow, &lastRow));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1020,7 +1028,7 @@ int32_t tqProcessRowData(STqReader* pReader, SSubmitTbData* pSubmitTbData, SArra
|
||||||
}
|
}
|
||||||
|
|
||||||
if (buildNew) {
|
if (buildNew) {
|
||||||
TQ_ERR_GO_TO_END(processBuildNew(pReader, pSubmitTbData, blocks, schemas, pSchemaWrapper, assigned, numOfRows,
|
TQ_ERR_GO_TO_END(processBuildNew(pReader, pSubmitTbData, blocks, schemas, assigned, numOfRows,
|
||||||
curRow, &lastRow));
|
curRow, &lastRow));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1126,8 +1134,8 @@ int32_t tqRetrieveTaosxBlock(STqReader* pReader, SMqDataRsp* pRsp, SArray* block
|
||||||
pReader->lastBlkUid = uid;
|
pReader->lastBlkUid = uid;
|
||||||
|
|
||||||
tDeleteSchemaWrapper(pReader->pSchemaWrapper);
|
tDeleteSchemaWrapper(pReader->pSchemaWrapper);
|
||||||
metaFreeSExtSchema(pReader->extSchema);
|
taosMemoryFree(pReader->extSchema);
|
||||||
pReader->pSchemaWrapper = metaGetTableSchema(pReader->pVnodeMeta, uid, sversion, 1, createTime, &pReader->extSchema);
|
pReader->pSchemaWrapper = metaGetTableSchema(pReader->pVnodeMeta, uid, sversion, 1, &pReader->extSchema);
|
||||||
if (pReader->pSchemaWrapper == NULL) {
|
if (pReader->pSchemaWrapper == NULL) {
|
||||||
tqWarn("vgId:%d, cannot found schema wrapper for table: suid:%" PRId64 ", version %d, possibly dropped table",
|
tqWarn("vgId:%d, cannot found schema wrapper for table: suid:%" PRId64 ", version %d, possibly dropped table",
|
||||||
pReader->pWalReader->pWal->cfg.vgId, uid, pReader->cachedSchemaVer);
|
pReader->pWalReader->pWal->cfg.vgId, uid, pReader->cachedSchemaVer);
|
||||||
|
@ -1141,10 +1149,12 @@ int32_t tqRetrieveTaosxBlock(STqReader* pReader, SMqDataRsp* pRsp, SArray* block
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
} else if (rawList != NULL) {
|
} else if (rawList != NULL) {
|
||||||
if (taosArrayPush(schemas, &pReader->pSchemaWrapper) == NULL){
|
TQSchema schema = {pReader->pSchemaWrapper, pReader->extSchema};
|
||||||
|
if (taosArrayPush(schemas, &schema) == NULL){
|
||||||
return terrno;
|
return terrno;
|
||||||
}
|
}
|
||||||
pReader->pSchemaWrapper = NULL;
|
pReader->pSchemaWrapper = NULL;
|
||||||
|
pReader->extSchema = NULL;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -238,6 +238,7 @@ int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqBat
|
||||||
int32_t lino = 0;
|
int32_t lino = 0;
|
||||||
char* tbName = NULL;
|
char* tbName = NULL;
|
||||||
SSchemaWrapper* pSW = NULL;
|
SSchemaWrapper* pSW = NULL;
|
||||||
|
SExtSchema* pSWExt = NULL;
|
||||||
const STqExecHandle* pExec = &pHandle->execHandle;
|
const STqExecHandle* pExec = &pHandle->execHandle;
|
||||||
qTaskInfo_t task = pExec->task;
|
qTaskInfo_t task = pExec->task;
|
||||||
code = qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType);
|
code = qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType);
|
||||||
|
@ -266,6 +267,10 @@ int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqBat
|
||||||
TSDB_CHECK_NULL(pSW, code, lino, END, terrno);
|
TSDB_CHECK_NULL(pSW, code, lino, END, terrno);
|
||||||
TSDB_CHECK_NULL(taosArrayPush(pRsp->blockSchema, &pSW), code, lino, END, terrno);
|
TSDB_CHECK_NULL(taosArrayPush(pRsp->blockSchema, &pSW), code, lino, END, terrno);
|
||||||
pSW = NULL;
|
pSW = NULL;
|
||||||
|
pSWExt = metaCloneSExtSchema(qExtractSchemaExtFromTask(task), pSW->nCols);
|
||||||
|
TSDB_CHECK_NULL(pSWExt, code, lino, END, terrno);
|
||||||
|
TSDB_CHECK_NULL(taosArrayPush(pRsp->blockSchemaExt, &pSWExt), code, lino, END, terrno);
|
||||||
|
pSWExt = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
code = tqAddBlockDataToRsp(pDataBlock, pRsp, taosArrayGetSize(pDataBlock->pDataBlock),
|
code = tqAddBlockDataToRsp(pDataBlock, pRsp, taosArrayGetSize(pDataBlock->pDataBlock),
|
||||||
|
@ -313,7 +318,8 @@ END:
|
||||||
if (code != 0){
|
if (code != 0){
|
||||||
tqError("%s failed at %d, vgId:%d, task exec error since %s", __FUNCTION__ , lino, pTq->pVnode->config.vgId, tstrerror(code));
|
tqError("%s failed at %d, vgId:%d, task exec error since %s", __FUNCTION__ , lino, pTq->pVnode->config.vgId, tstrerror(code));
|
||||||
}
|
}
|
||||||
taosMemoryFree(pSW);
|
tDeleteSchemaWrapper(pSW);
|
||||||
|
taosMemoryFree(pSWExt);
|
||||||
taosMemoryFree(tbName);
|
taosMemoryFree(tbName);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -330,7 +336,7 @@ static void tqProcessSubData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, int
|
||||||
|
|
||||||
pBlocks = taosArrayInit(0, sizeof(SSDataBlock));
|
pBlocks = taosArrayInit(0, sizeof(SSDataBlock));
|
||||||
TSDB_CHECK_NULL(pBlocks, code, lino, END, terrno);
|
TSDB_CHECK_NULL(pBlocks, code, lino, END, terrno);
|
||||||
pSchemas = taosArrayInit(0, sizeof(void*));
|
pSchemas = taosArrayInit(0, sizeof(TQSchema));
|
||||||
TSDB_CHECK_NULL(pSchemas, code, lino, END, terrno);
|
TSDB_CHECK_NULL(pSchemas, code, lino, END, terrno);
|
||||||
|
|
||||||
SSubmitTbData* pSubmitTbData = NULL;
|
SSubmitTbData* pSubmitTbData = NULL;
|
||||||
|
@ -375,12 +381,17 @@ static void tqProcessSubData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, int
|
||||||
*totalRows += pBlock->info.rows;
|
*totalRows += pBlock->info.rows;
|
||||||
}
|
}
|
||||||
|
|
||||||
void** pSW = taosArrayGet(pSchemas, i);
|
TQSchema* schema = (TQSchema*)taosArrayGet(pSchemas, i);
|
||||||
if (taosArrayPush(pRsp->blockSchema, pSW) == NULL){
|
if (taosArrayPush(pRsp->blockSchema, &schema->pSchemaWrapper) == NULL){
|
||||||
tqError("vgId:%d, failed to add schema to rsp msg", pTq->pVnode->config.vgId);
|
tqError("vgId:%d, failed to add schema to rsp msg", pTq->pVnode->config.vgId);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
*pSW = NULL;
|
if (taosArrayPush(pRsp->blockSchemaExt, &schema->extSchema) == NULL){
|
||||||
|
tqError("vgId:%d, failed to add schema to rsp msg", pTq->pVnode->config.vgId);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
schema->pSchemaWrapper = NULL;
|
||||||
|
schema->extSchema = NULL;
|
||||||
pRsp->blockNum++;
|
pRsp->blockNum++;
|
||||||
}
|
}
|
||||||
tqTrace("vgId:%d, process sub data success, response blocknum:%d, rows:%d", pTq->pVnode->config.vgId, pRsp->blockNum, *totalRows);
|
tqTrace("vgId:%d, process sub data success, response blocknum:%d, rows:%d", pTq->pVnode->config.vgId, pRsp->blockNum, *totalRows);
|
||||||
|
@ -389,7 +400,7 @@ END:
|
||||||
tqError("%s failed at %d, failed to process sub data:%s", __FUNCTION__, lino, tstrerror(code));
|
tqError("%s failed at %d, failed to process sub data:%s", __FUNCTION__, lino, tstrerror(code));
|
||||||
}
|
}
|
||||||
taosArrayDestroyEx(pBlocks, (FDelete)blockDataFreeRes);
|
taosArrayDestroyEx(pBlocks, (FDelete)blockDataFreeRes);
|
||||||
taosArrayDestroyP(pSchemas, (FDelete)tDeleteSchemaWrapper);
|
taosArrayDestroyP(pSchemas, (FDelete)freeTqSchema);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void preProcessSubmitMsg(STqHandle* pHandle, const SMqPollReq* pRequest, SArray** rawList){
|
static void preProcessSubmitMsg(STqHandle* pHandle, const SMqPollReq* pRequest, SArray** rawList){
|
||||||
|
|
|
@ -75,7 +75,8 @@ static int32_t tqInitTaosxRsp(SMqDataRsp* pRsp, STqOffsetVal pOffset) {
|
||||||
pRsp->blockSchema = taosArrayInit(0, sizeof(void*));
|
pRsp->blockSchema = taosArrayInit(0, sizeof(void*));
|
||||||
TSDB_CHECK_NULL(pRsp->blockSchema, code, lino, END, terrno);
|
TSDB_CHECK_NULL(pRsp->blockSchema, code, lino, END, terrno);
|
||||||
|
|
||||||
|
pRsp->blockSchemaExt = taosArrayInit(0, sizeof(void*));
|
||||||
|
TSDB_CHECK_NULL(pRsp->blockSchemaExt, code, lino, END, terrno);
|
||||||
END:
|
END:
|
||||||
if (code != 0){
|
if (code != 0){
|
||||||
tqError("%s failed at:%d, code:%s", __FUNCTION__ , lino, tstrerror(code));
|
tqError("%s failed at:%d, code:%s", __FUNCTION__ , lino, tstrerror(code));
|
||||||
|
@ -83,6 +84,7 @@ END:
|
||||||
taosArrayDestroy(pRsp->blockDataLen);
|
taosArrayDestroy(pRsp->blockDataLen);
|
||||||
taosArrayDestroy(pRsp->blockTbName);
|
taosArrayDestroy(pRsp->blockTbName);
|
||||||
taosArrayDestroy(pRsp->blockSchema);
|
taosArrayDestroy(pRsp->blockSchema);
|
||||||
|
taosArrayDestroy(pRsp->blockSchemaExt);
|
||||||
}
|
}
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
|
@ -760,7 +760,7 @@ int32_t vnodeGetCtbNum(SVnode *pVnode, int64_t suid, int64_t *num) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t vnodeGetStbColumnNum(SVnode *pVnode, tb_uid_t suid, int *num) {
|
int32_t vnodeGetStbColumnNum(SVnode *pVnode, tb_uid_t suid, int *num) {
|
||||||
SSchemaWrapper *pSW = metaGetTableSchema(pVnode->pMeta, suid, -1, 0, NULL, NULL);
|
SSchemaWrapper *pSW = metaGetTableSchema(pVnode->pMeta, suid, -1, 0, NULL);
|
||||||
if (pSW) {
|
if (pSW) {
|
||||||
*num = pSW->nCols;
|
*num = pSW->nCols;
|
||||||
tDeleteSchemaWrapper(pSW);
|
tDeleteSchemaWrapper(pSW);
|
||||||
|
|
|
@ -64,6 +64,7 @@ typedef struct {
|
||||||
int8_t sourceExcluded;
|
int8_t sourceExcluded;
|
||||||
int64_t snapshotVer;
|
int64_t snapshotVer;
|
||||||
SSchemaWrapper* schema;
|
SSchemaWrapper* schema;
|
||||||
|
SExtSchema* pExtSchemas;
|
||||||
char tbName[TSDB_TABLE_NAME_LEN]; // this is the current scan table: todo refactor
|
char tbName[TSDB_TABLE_NAME_LEN]; // this is the current scan table: todo refactor
|
||||||
int8_t recoverStep;
|
int8_t recoverStep;
|
||||||
int8_t recoverScanFinished;
|
int8_t recoverScanFinished;
|
||||||
|
|
|
@ -1306,6 +1306,11 @@ const SSchemaWrapper* qExtractSchemaFromTask(qTaskInfo_t tinfo) {
|
||||||
return pTaskInfo->streamInfo.schema;
|
return pTaskInfo->streamInfo.schema;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const SExtSchema* qExtractSchemaExtFromTask(qTaskInfo_t tinfo) {
|
||||||
|
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
|
||||||
|
return pTaskInfo->streamInfo.pExtSchemas;
|
||||||
|
}
|
||||||
|
|
||||||
const char* qExtractTbnameFromTask(qTaskInfo_t tinfo) {
|
const char* qExtractTbnameFromTask(qTaskInfo_t tinfo) {
|
||||||
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
|
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
|
||||||
return pTaskInfo->streamInfo.tbName;
|
return pTaskInfo->streamInfo.tbName;
|
||||||
|
@ -1536,6 +1541,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
|
||||||
SMetaTableInfo mtInfo = {0};
|
SMetaTableInfo mtInfo = {0};
|
||||||
code = pTaskInfo->storageAPI.snapshotFn.getMetaTableInfoFromSnapshot(sContext, &mtInfo);
|
code = pTaskInfo->storageAPI.snapshotFn.getMetaTableInfoFromSnapshot(sContext, &mtInfo);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
|
destroyMetaTableInfo(&mtInfo);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
pTaskInfo->storageAPI.tsdReader.tsdReaderClose(pInfo->dataReader);
|
pTaskInfo->storageAPI.tsdReader.tsdReaderClose(pInfo->dataReader);
|
||||||
|
@ -1545,7 +1551,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
|
||||||
tableListClear(pTableListInfo);
|
tableListClear(pTableListInfo);
|
||||||
|
|
||||||
if (mtInfo.uid == 0) {
|
if (mtInfo.uid == 0) {
|
||||||
tDeleteSchemaWrapper(mtInfo.schema);
|
destroyMetaTableInfo(&mtInfo);
|
||||||
goto end; // no data
|
goto end; // no data
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1553,7 +1559,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
|
||||||
code = initQueryTableDataCondForTmq(&pTaskInfo->streamInfo.tableCond, sContext, &mtInfo);
|
code = initQueryTableDataCondForTmq(&pTaskInfo->streamInfo.tableCond, sContext, &mtInfo);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
|
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
|
||||||
tDeleteSchemaWrapper(mtInfo.schema);
|
destroyMetaTableInfo(&mtInfo);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
if (pAPI->snapshotFn.taosXGetTablePrimaryKey(sContext)) {
|
if (pAPI->snapshotFn.taosXGetTablePrimaryKey(sContext)) {
|
||||||
|
@ -1564,7 +1570,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
|
||||||
|
|
||||||
code = tableListAddTableInfo(pTableListInfo, mtInfo.uid, 0);
|
code = tableListAddTableInfo(pTableListInfo, mtInfo.uid, 0);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
tDeleteSchemaWrapper(mtInfo.schema);
|
destroyMetaTableInfo(&mtInfo);
|
||||||
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
|
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -1572,14 +1578,14 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
|
||||||
STableKeyInfo* pList = tableListGetInfo(pTableListInfo, 0);
|
STableKeyInfo* pList = tableListGetInfo(pTableListInfo, 0);
|
||||||
if (!pList) {
|
if (!pList) {
|
||||||
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
|
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
|
||||||
tDeleteSchemaWrapper(mtInfo.schema);
|
destroyMetaTableInfo(&mtInfo);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
int32_t size = 0;
|
int32_t size = 0;
|
||||||
code = tableListGetSize(pTableListInfo, &size);
|
code = tableListGetSize(pTableListInfo, &size);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
|
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
|
||||||
tDeleteSchemaWrapper(mtInfo.schema);
|
destroyMetaTableInfo(&mtInfo);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1587,7 +1593,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
|
||||||
NULL, (void**)&pInfo->dataReader, NULL, NULL);
|
NULL, (void**)&pInfo->dataReader, NULL, NULL);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
|
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
|
||||||
tDeleteSchemaWrapper(mtInfo.schema);
|
destroyMetaTableInfo(&mtInfo);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1595,7 +1601,9 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
|
||||||
tstrncpy(pTaskInfo->streamInfo.tbName, mtInfo.tbName, TSDB_TABLE_NAME_LEN);
|
tstrncpy(pTaskInfo->streamInfo.tbName, mtInfo.tbName, TSDB_TABLE_NAME_LEN);
|
||||||
// pTaskInfo->streamInfo.suid = mtInfo.suid == 0 ? mtInfo.uid : mtInfo.suid;
|
// pTaskInfo->streamInfo.suid = mtInfo.suid == 0 ? mtInfo.uid : mtInfo.suid;
|
||||||
tDeleteSchemaWrapper(pTaskInfo->streamInfo.schema);
|
tDeleteSchemaWrapper(pTaskInfo->streamInfo.schema);
|
||||||
|
taosMemoryFreeClear(pTaskInfo->streamInfo.pExtSchemas);
|
||||||
pTaskInfo->streamInfo.schema = mtInfo.schema;
|
pTaskInfo->streamInfo.schema = mtInfo.schema;
|
||||||
|
pTaskInfo->streamInfo.pExtSchemas = mtInfo.pExtSchemas;
|
||||||
|
|
||||||
qDebug("tmqsnap qStreamPrepareScan snapshot data uid:%" PRId64 " ts %" PRId64 " %s", mtInfo.uid, pOffset->ts, id);
|
qDebug("tmqsnap qStreamPrepareScan snapshot data uid:%" PRId64 " ts %" PRId64 " %s", mtInfo.uid, pOffset->ts, id);
|
||||||
} else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_META) {
|
} else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_META) {
|
||||||
|
|
|
@ -261,6 +261,7 @@ SSchemaWrapper* extractQueriedColumnSchema(SScanPhysiNode* pScanNode) {
|
||||||
|
|
||||||
static void cleanupStreamInfo(SStreamTaskInfo* pStreamInfo) {
|
static void cleanupStreamInfo(SStreamTaskInfo* pStreamInfo) {
|
||||||
tDeleteSchemaWrapper(pStreamInfo->schema);
|
tDeleteSchemaWrapper(pStreamInfo->schema);
|
||||||
|
taosMemoryFreeClear(pStreamInfo->pExtSchemas);
|
||||||
tOffsetDestroy(&pStreamInfo->currentOffset);
|
tOffsetDestroy(&pStreamInfo->currentOffset);
|
||||||
tDeleteSchemaWrapper(pStreamInfo->notifyResultSchema);
|
tDeleteSchemaWrapper(pStreamInfo->notifyResultSchema);
|
||||||
taosMemoryFree(pStreamInfo->stbFullName);
|
taosMemoryFree(pStreamInfo->stbFullName);
|
||||||
|
|
|
@ -4150,7 +4150,7 @@ static int32_t doRawScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
|
||||||
code = pAPI->snapshotFn.getMetaTableInfoFromSnapshot(pInfo->sContext, &mtInfo);
|
code = pAPI->snapshotFn.getMetaTableInfoFromSnapshot(pInfo->sContext, &mtInfo);
|
||||||
QUERY_CHECK_CODE(code, lino, _end);
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
tDeleteSchemaWrapper(mtInfo.schema);
|
destroyMetaTableInfo(&mtInfo);
|
||||||
QUERY_CHECK_CODE(code, lino, _end);
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
}
|
}
|
||||||
STqOffsetVal offset = {0};
|
STqOffsetVal offset = {0};
|
||||||
|
@ -4162,7 +4162,7 @@ static int32_t doRawScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
|
||||||
tqOffsetResetToData(&offset, mtInfo.uid, INT64_MIN, val);
|
tqOffsetResetToData(&offset, mtInfo.uid, INT64_MIN, val);
|
||||||
qDebug("tmqsnap change get data uid:%" PRId64 "", mtInfo.uid);
|
qDebug("tmqsnap change get data uid:%" PRId64 "", mtInfo.uid);
|
||||||
}
|
}
|
||||||
tDeleteSchemaWrapper(mtInfo.schema);
|
destroyMetaTableInfo(&mtInfo);
|
||||||
code = qStreamPrepareScan(pTaskInfo, &offset, pInfo->sContext->subType);
|
code = qStreamPrepareScan(pTaskInfo, &offset, pInfo->sContext->subType);
|
||||||
QUERY_CHECK_CODE(code, lino, _end);
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
(*ppRes) = NULL;
|
(*ppRes) = NULL;
|
||||||
|
|
Loading…
Reference in New Issue