Merge pull request #13026 from taosdata/feat/two_versions
feat: seperate data schema version and tag schema version
This commit is contained in:
commit
b911b5ba1c
|
@ -300,9 +300,7 @@ typedef struct SSchema {
|
|||
|
||||
typedef struct {
|
||||
int32_t nCols;
|
||||
int32_t sver;
|
||||
int32_t tagVer;
|
||||
int32_t colVer;
|
||||
int32_t version;
|
||||
SSchema* pSchema;
|
||||
} SSchemaWrapper;
|
||||
|
||||
|
@ -310,9 +308,7 @@ static FORCE_INLINE SSchemaWrapper* tCloneSSchemaWrapper(const SSchemaWrapper* p
|
|||
SSchemaWrapper* pSW = (SSchemaWrapper*)taosMemoryMalloc(sizeof(SSchemaWrapper));
|
||||
if (pSW == NULL) return pSW;
|
||||
pSW->nCols = pSchemaWrapper->nCols;
|
||||
pSW->sver = pSchemaWrapper->sver;
|
||||
pSW->tagVer = pSchemaWrapper->tagVer;
|
||||
pSW->colVer = pSchemaWrapper->colVer;
|
||||
pSW->version = pSchemaWrapper->version;
|
||||
pSW->pSchema = (SSchema*)taosMemoryCalloc(pSW->nCols, sizeof(SSchema));
|
||||
if (pSW->pSchema == NULL) {
|
||||
taosMemoryFree(pSW);
|
||||
|
@ -367,9 +363,7 @@ static FORCE_INLINE int32_t tDecodeSSchema(SDecoder* pDecoder, SSchema* pSchema)
|
|||
static FORCE_INLINE int32_t taosEncodeSSchemaWrapper(void** buf, const SSchemaWrapper* pSW) {
|
||||
int32_t tlen = 0;
|
||||
tlen += taosEncodeVariantI32(buf, pSW->nCols);
|
||||
tlen += taosEncodeVariantI32(buf, pSW->sver);
|
||||
tlen += taosEncodeVariantI32(buf, pSW->tagVer);
|
||||
tlen += taosEncodeVariantI32(buf, pSW->colVer);
|
||||
tlen += taosEncodeVariantI32(buf, pSW->version);
|
||||
for (int32_t i = 0; i < pSW->nCols; i++) {
|
||||
tlen += taosEncodeSSchema(buf, &pSW->pSchema[i]);
|
||||
}
|
||||
|
@ -378,9 +372,7 @@ static FORCE_INLINE int32_t taosEncodeSSchemaWrapper(void** buf, const SSchemaWr
|
|||
|
||||
static FORCE_INLINE void* taosDecodeSSchemaWrapper(const void* buf, SSchemaWrapper* pSW) {
|
||||
buf = taosDecodeVariantI32(buf, &pSW->nCols);
|
||||
buf = taosDecodeVariantI32(buf, &pSW->sver);
|
||||
buf = taosDecodeVariantI32(buf, &pSW->tagVer);
|
||||
buf = taosDecodeVariantI32(buf, &pSW->colVer);
|
||||
buf = taosDecodeVariantI32(buf, &pSW->version);
|
||||
pSW->pSchema = (SSchema*)taosMemoryCalloc(pSW->nCols, sizeof(SSchema));
|
||||
if (pSW->pSchema == NULL) {
|
||||
return NULL;
|
||||
|
@ -394,9 +386,7 @@ static FORCE_INLINE void* taosDecodeSSchemaWrapper(const void* buf, SSchemaWrapp
|
|||
|
||||
static FORCE_INLINE int32_t tEncodeSSchemaWrapper(SEncoder* pEncoder, const SSchemaWrapper* pSW) {
|
||||
if (tEncodeI32v(pEncoder, pSW->nCols) < 0) return -1;
|
||||
if (tEncodeI32v(pEncoder, pSW->sver) < 0) return -1;
|
||||
if (tEncodeI32v(pEncoder, pSW->tagVer) < 0) return -1;
|
||||
if (tEncodeI32v(pEncoder, pSW->colVer) < 0) return -1;
|
||||
if (tEncodeI32v(pEncoder, pSW->version) < 0) return -1;
|
||||
for (int32_t i = 0; i < pSW->nCols; i++) {
|
||||
if (tEncodeSSchema(pEncoder, &pSW->pSchema[i]) < 0) return -1;
|
||||
}
|
||||
|
@ -406,9 +396,7 @@ static FORCE_INLINE int32_t tEncodeSSchemaWrapper(SEncoder* pEncoder, const SSch
|
|||
|
||||
static FORCE_INLINE int32_t tDecodeSSchemaWrapper(SDecoder* pDecoder, SSchemaWrapper* pSW) {
|
||||
if (tDecodeI32v(pDecoder, &pSW->nCols) < 0) return -1;
|
||||
if (tDecodeI32v(pDecoder, &pSW->sver) < 0) return -1;
|
||||
if (tDecodeI32v(pDecoder, &pSW->tagVer) < 0) return -1;
|
||||
if (tDecodeI32v(pDecoder, &pSW->colVer) < 0) return -1;
|
||||
if (tDecodeI32v(pDecoder, &pSW->version) < 0) return -1;
|
||||
|
||||
pSW->pSchema = (SSchema*)taosMemoryCalloc(pSW->nCols, sizeof(SSchema));
|
||||
if (pSW->pSchema == NULL) return -1;
|
||||
|
@ -421,9 +409,7 @@ static FORCE_INLINE int32_t tDecodeSSchemaWrapper(SDecoder* pDecoder, SSchemaWra
|
|||
|
||||
static FORCE_INLINE int32_t tDecodeSSchemaWrapperEx(SDecoder* pDecoder, SSchemaWrapper* pSW) {
|
||||
if (tDecodeI32v(pDecoder, &pSW->nCols) < 0) return -1;
|
||||
if (tDecodeI32v(pDecoder, &pSW->sver) < 0) return -1;
|
||||
if (tDecodeI32v(pDecoder, &pSW->tagVer) < 0) return -1;
|
||||
if (tDecodeI32v(pDecoder, &pSW->colVer) < 0) return -1;
|
||||
if (tDecodeI32v(pDecoder, &pSW->version) < 0) return -1;
|
||||
|
||||
pSW->pSchema = (SSchema*)tDecoderMalloc(pDecoder, pSW->nCols * sizeof(SSchema));
|
||||
if (pSW->pSchema == NULL) return -1;
|
||||
|
@ -1713,7 +1699,7 @@ typedef struct SVCreateStbReq {
|
|||
char* name;
|
||||
tb_uid_t suid;
|
||||
int8_t rollup;
|
||||
SSchemaWrapper schema;
|
||||
SSchemaWrapper schemaRow;
|
||||
SSchemaWrapper schemaTag;
|
||||
SRSmaParam pRSmaParam;
|
||||
} SVCreateStbReq;
|
||||
|
@ -1745,7 +1731,7 @@ typedef struct SVCreateTbReq {
|
|||
uint8_t* pTag;
|
||||
} ctb;
|
||||
struct {
|
||||
SSchemaWrapper schema;
|
||||
SSchemaWrapper schemaRow;
|
||||
} ntb;
|
||||
};
|
||||
} SVCreateTbReq;
|
||||
|
|
|
@ -3801,7 +3801,7 @@ int tEncodeSVCreateStbReq(SEncoder *pCoder, const SVCreateStbReq *pReq) {
|
|||
if (tEncodeCStr(pCoder, pReq->name) < 0) return -1;
|
||||
if (tEncodeI64(pCoder, pReq->suid) < 0) return -1;
|
||||
if (tEncodeI8(pCoder, pReq->rollup) < 0) return -1;
|
||||
if (tEncodeSSchemaWrapper(pCoder, &pReq->schema) < 0) return -1;
|
||||
if (tEncodeSSchemaWrapper(pCoder, &pReq->schemaRow) < 0) return -1;
|
||||
if (tEncodeSSchemaWrapper(pCoder, &pReq->schemaTag) < 0) return -1;
|
||||
if (pReq->rollup) {
|
||||
if (tEncodeSRSmaParam(pCoder, &pReq->pRSmaParam) < 0) return -1;
|
||||
|
@ -3817,7 +3817,7 @@ int tDecodeSVCreateStbReq(SDecoder *pCoder, SVCreateStbReq *pReq) {
|
|||
if (tDecodeCStr(pCoder, &pReq->name) < 0) return -1;
|
||||
if (tDecodeI64(pCoder, &pReq->suid) < 0) return -1;
|
||||
if (tDecodeI8(pCoder, &pReq->rollup) < 0) return -1;
|
||||
if (tDecodeSSchemaWrapper(pCoder, &pReq->schema) < 0) return -1;
|
||||
if (tDecodeSSchemaWrapper(pCoder, &pReq->schemaRow) < 0) return -1;
|
||||
if (tDecodeSSchemaWrapper(pCoder, &pReq->schemaTag) < 0) return -1;
|
||||
if (pReq->rollup) {
|
||||
if (tDecodeSRSmaParam(pCoder, &pReq->pRSmaParam) < 0) return -1;
|
||||
|
@ -3866,7 +3866,7 @@ int tEncodeSVCreateTbReq(SEncoder *pCoder, const SVCreateTbReq *pReq) {
|
|||
if (tEncodeI64(pCoder, pReq->ctb.suid) < 0) return -1;
|
||||
if (tEncodeBinary(pCoder, pReq->ctb.pTag, kvRowLen(pReq->ctb.pTag)) < 0) return -1;
|
||||
} else if (pReq->type == TSDB_NORMAL_TABLE) {
|
||||
if (tEncodeSSchemaWrapper(pCoder, &pReq->ntb.schema) < 0) return -1;
|
||||
if (tEncodeSSchemaWrapper(pCoder, &pReq->ntb.schemaRow) < 0) return -1;
|
||||
} else {
|
||||
ASSERT(0);
|
||||
}
|
||||
|
@ -3892,7 +3892,7 @@ int tDecodeSVCreateTbReq(SDecoder *pCoder, SVCreateTbReq *pReq) {
|
|||
if (tDecodeI64(pCoder, &pReq->ctb.suid) < 0) return -1;
|
||||
if (tDecodeBinary(pCoder, &pReq->ctb.pTag, &len) < 0) return -1;
|
||||
} else if (pReq->type == TSDB_NORMAL_TABLE) {
|
||||
if (tDecodeSSchemaWrapper(pCoder, &pReq->ntb.schema) < 0) return -1;
|
||||
if (tDecodeSSchemaWrapper(pCoder, &pReq->ntb.schemaRow) < 0) return -1;
|
||||
} else {
|
||||
ASSERT(0);
|
||||
}
|
||||
|
|
|
@ -388,13 +388,12 @@ static void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pSt
|
|||
req.name = (char *)tNameGetTableName(&name);
|
||||
req.suid = pStb->uid;
|
||||
req.rollup = pStb->ast1Len > 0 ? 1 : 0;
|
||||
req.schema.nCols = pStb->numOfColumns;
|
||||
req.schema.sver = pStb->version;
|
||||
req.schema.tagVer = pStb->tagVer;
|
||||
req.schema.colVer = pStb->colVer;
|
||||
req.schema.pSchema = pStb->pColumns;
|
||||
// todo
|
||||
req.schemaRow.nCols = pStb->numOfColumns;
|
||||
req.schemaRow.version = pStb->version;
|
||||
req.schemaRow.pSchema = pStb->pColumns;
|
||||
req.schemaTag.nCols = pStb->numOfTags;
|
||||
req.schemaTag.sver = 1;
|
||||
req.schemaTag.version = pStb->tagVer;
|
||||
req.schemaTag.pSchema = pStb->pTags;
|
||||
|
||||
if (req.rollup) {
|
||||
|
|
|
@ -217,7 +217,7 @@ SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) {
|
|||
}
|
||||
} else {
|
||||
pTopic->schema.nCols = 0;
|
||||
pTopic->schema.sver = 0;
|
||||
pTopic->schema.version = 0;
|
||||
pTopic->schema.pSchema = NULL;
|
||||
}
|
||||
|
||||
|
|
|
@ -182,7 +182,7 @@ struct SMetaEntry {
|
|||
char *name;
|
||||
union {
|
||||
struct {
|
||||
SSchemaWrapper schema;
|
||||
SSchemaWrapper schemaRow;
|
||||
SSchemaWrapper schemaTag;
|
||||
} stbEntry;
|
||||
struct {
|
||||
|
@ -195,7 +195,7 @@ struct SMetaEntry {
|
|||
int64_t ctime;
|
||||
int32_t ttlDays;
|
||||
int32_t ncid; // next column id
|
||||
SSchemaWrapper schema;
|
||||
SSchemaWrapper schemaRow;
|
||||
} ntbEntry;
|
||||
struct {
|
||||
STSma *tsma;
|
||||
|
|
|
@ -24,7 +24,7 @@ int metaEncodeEntry(SEncoder *pCoder, const SMetaEntry *pME) {
|
|||
if (tEncodeCStr(pCoder, pME->name) < 0) return -1;
|
||||
|
||||
if (pME->type == TSDB_SUPER_TABLE) {
|
||||
if (tEncodeSSchemaWrapper(pCoder, &pME->stbEntry.schema) < 0) return -1;
|
||||
if (tEncodeSSchemaWrapper(pCoder, &pME->stbEntry.schemaRow) < 0) return -1;
|
||||
if (tEncodeSSchemaWrapper(pCoder, &pME->stbEntry.schemaTag) < 0) return -1;
|
||||
} else if (pME->type == TSDB_CHILD_TABLE) {
|
||||
if (tEncodeI64(pCoder, pME->ctbEntry.ctime) < 0) return -1;
|
||||
|
@ -35,7 +35,7 @@ int metaEncodeEntry(SEncoder *pCoder, const SMetaEntry *pME) {
|
|||
if (tEncodeI64(pCoder, pME->ntbEntry.ctime) < 0) return -1;
|
||||
if (tEncodeI32(pCoder, pME->ntbEntry.ttlDays) < 0) return -1;
|
||||
if (tEncodeI32v(pCoder, pME->ntbEntry.ncid) < 0) return -1;
|
||||
if (tEncodeSSchemaWrapper(pCoder, &pME->ntbEntry.schema) < 0) return -1;
|
||||
if (tEncodeSSchemaWrapper(pCoder, &pME->ntbEntry.schemaRow) < 0) return -1;
|
||||
} else if (pME->type == TSDB_TSMA_TABLE) {
|
||||
if (tEncodeTSma(pCoder, pME->smaEntry.tsma) < 0) return -1;
|
||||
} else {
|
||||
|
@ -56,7 +56,7 @@ int metaDecodeEntry(SDecoder *pCoder, SMetaEntry *pME) {
|
|||
if (tDecodeCStr(pCoder, &pME->name) < 0) return -1;
|
||||
|
||||
if (pME->type == TSDB_SUPER_TABLE) {
|
||||
if (tDecodeSSchemaWrapperEx(pCoder, &pME->stbEntry.schema) < 0) return -1;
|
||||
if (tDecodeSSchemaWrapperEx(pCoder, &pME->stbEntry.schemaRow) < 0) return -1;
|
||||
if (tDecodeSSchemaWrapperEx(pCoder, &pME->stbEntry.schemaTag) < 0) return -1;
|
||||
} else if (pME->type == TSDB_CHILD_TABLE) {
|
||||
if (tDecodeI64(pCoder, &pME->ctbEntry.ctime) < 0) return -1;
|
||||
|
@ -67,7 +67,7 @@ int metaDecodeEntry(SDecoder *pCoder, SMetaEntry *pME) {
|
|||
if (tDecodeI64(pCoder, &pME->ntbEntry.ctime) < 0) return -1;
|
||||
if (tDecodeI32(pCoder, &pME->ntbEntry.ttlDays) < 0) return -1;
|
||||
if (tDecodeI32v(pCoder, &pME->ntbEntry.ncid) < 0) return -1;
|
||||
if (tDecodeSSchemaWrapperEx(pCoder, &pME->ntbEntry.schema) < 0) return -1;
|
||||
if (tDecodeSSchemaWrapperEx(pCoder, &pME->ntbEntry.schemaRow) < 0) return -1;
|
||||
} else if (pME->type == TSDB_TSMA_TABLE) {
|
||||
pME->smaEntry.tsma = tDecoderMalloc(pCoder, sizeof(STSma));
|
||||
if (!pME->smaEntry.tsma) {
|
||||
|
|
|
@ -56,7 +56,7 @@ int metaCreateSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {
|
|||
me.type = TSDB_SUPER_TABLE;
|
||||
me.uid = pReq->suid;
|
||||
me.name = pReq->name;
|
||||
me.stbEntry.schema = pReq->schema;
|
||||
me.stbEntry.schemaRow = pReq->schemaRow;
|
||||
me.stbEntry.schemaTag = pReq->schemaTag;
|
||||
|
||||
if (metaHandleEntry(pMeta, &me) < 0) goto _err;
|
||||
|
@ -182,15 +182,13 @@ int metaAlterSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {
|
|||
nStbEntry.type = TSDB_SUPER_TABLE;
|
||||
nStbEntry.uid = pReq->suid;
|
||||
nStbEntry.name = pReq->name;
|
||||
nStbEntry.stbEntry.schema = pReq->schema;
|
||||
nStbEntry.stbEntry.schemaRow = pReq->schemaRow;
|
||||
nStbEntry.stbEntry.schemaTag = pReq->schemaTag;
|
||||
|
||||
metaWLock(pMeta);
|
||||
// compare two entry
|
||||
if (oStbEntry.stbEntry.schema.sver != pReq->schema.sver) {
|
||||
if (oStbEntry.stbEntry.schema.nCols != pReq->schema.nCols) {
|
||||
metaSaveToSkmDb(pMeta, &nStbEntry);
|
||||
}
|
||||
if (oStbEntry.stbEntry.schemaRow.version != pReq->schemaRow.version) {
|
||||
metaSaveToSkmDb(pMeta, &nStbEntry);
|
||||
}
|
||||
|
||||
// if (oStbEntry.stbEntry.schemaTag.sver != pReq->schemaTag.sver) {
|
||||
|
@ -247,8 +245,8 @@ int metaCreateTable(SMeta *pMeta, int64_t version, SVCreateTbReq *pReq) {
|
|||
} else {
|
||||
me.ntbEntry.ctime = pReq->ctime;
|
||||
me.ntbEntry.ttlDays = pReq->ttl;
|
||||
me.ntbEntry.schema = pReq->ntb.schema;
|
||||
me.ntbEntry.ncid = me.ntbEntry.schema.pSchema[me.ntbEntry.schema.nCols - 1].colId + 1;
|
||||
me.ntbEntry.schemaRow = pReq->ntb.schemaRow;
|
||||
me.ntbEntry.ncid = me.ntbEntry.schemaRow.pSchema[me.ntbEntry.schemaRow.nCols - 1].colId + 1;
|
||||
}
|
||||
|
||||
if (metaHandleEntry(pMeta, &me) < 0) goto _err;
|
||||
|
@ -381,7 +379,7 @@ static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAl
|
|||
}
|
||||
|
||||
// search the column to add/drop/update
|
||||
pSchema = &entry.ntbEntry.schema;
|
||||
pSchema = &entry.ntbEntry.schemaRow;
|
||||
int32_t iCol = 0;
|
||||
for (;;) {
|
||||
pColumn = NULL;
|
||||
|
@ -402,16 +400,16 @@ static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAl
|
|||
terrno = TSDB_CODE_VND_COL_ALREADY_EXISTS;
|
||||
goto _err;
|
||||
}
|
||||
pSchema->sver++;
|
||||
pSchema->version++;
|
||||
pSchema->nCols++;
|
||||
pNewSchema = taosMemoryMalloc(sizeof(SSchema) * pSchema->nCols);
|
||||
memcpy(pNewSchema, pSchema->pSchema, sizeof(SSchema) * (pSchema->nCols - 1));
|
||||
pSchema->pSchema = pNewSchema;
|
||||
pSchema->pSchema[entry.ntbEntry.schema.nCols - 1].bytes = pAlterTbReq->bytes;
|
||||
pSchema->pSchema[entry.ntbEntry.schema.nCols - 1].type = pAlterTbReq->type;
|
||||
pSchema->pSchema[entry.ntbEntry.schema.nCols - 1].flags = pAlterTbReq->flags;
|
||||
pSchema->pSchema[entry.ntbEntry.schema.nCols - 1].colId = entry.ntbEntry.ncid++;
|
||||
strcpy(pSchema->pSchema[entry.ntbEntry.schema.nCols - 1].name, pAlterTbReq->colName);
|
||||
pSchema->pSchema[entry.ntbEntry.schemaRow.nCols - 1].bytes = pAlterTbReq->bytes;
|
||||
pSchema->pSchema[entry.ntbEntry.schemaRow.nCols - 1].type = pAlterTbReq->type;
|
||||
pSchema->pSchema[entry.ntbEntry.schemaRow.nCols - 1].flags = pAlterTbReq->flags;
|
||||
pSchema->pSchema[entry.ntbEntry.schemaRow.nCols - 1].colId = entry.ntbEntry.ncid++;
|
||||
strcpy(pSchema->pSchema[entry.ntbEntry.schemaRow.nCols - 1].name, pAlterTbReq->colName);
|
||||
break;
|
||||
case TSDB_ALTER_TABLE_DROP_COLUMN:
|
||||
if (pColumn == NULL) {
|
||||
|
@ -422,7 +420,7 @@ static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAl
|
|||
terrno = TSDB_CODE_VND_INVALID_TABLE_ACTION;
|
||||
goto _err;
|
||||
}
|
||||
pSchema->sver++;
|
||||
pSchema->version++;
|
||||
tlen = (pSchema->nCols - iCol - 1) * sizeof(SSchema);
|
||||
if (tlen) {
|
||||
memmove(pColumn, pColumn + 1, tlen);
|
||||
|
@ -438,7 +436,7 @@ static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAl
|
|||
terrno = TSDB_CODE_VND_INVALID_TABLE_ACTION;
|
||||
goto _err;
|
||||
}
|
||||
pSchema->sver++;
|
||||
pSchema->version++;
|
||||
pColumn->bytes = pAlterTbReq->colModBytes;
|
||||
break;
|
||||
case TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME:
|
||||
|
@ -446,7 +444,7 @@ static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAl
|
|||
terrno = TSDB_CODE_VND_TABLE_COL_NOT_EXISTS;
|
||||
goto _err;
|
||||
}
|
||||
pSchema->sver++;
|
||||
pSchema->version++;
|
||||
strcpy(pColumn->name, pAlterTbReq->colNewName);
|
||||
break;
|
||||
}
|
||||
|
@ -813,15 +811,15 @@ static int metaSaveToSkmDb(SMeta *pMeta, const SMetaEntry *pME) {
|
|||
const SSchemaWrapper *pSW;
|
||||
|
||||
if (pME->type == TSDB_SUPER_TABLE) {
|
||||
pSW = &pME->stbEntry.schema;
|
||||
pSW = &pME->stbEntry.schemaRow;
|
||||
} else if (pME->type == TSDB_NORMAL_TABLE) {
|
||||
pSW = &pME->ntbEntry.schema;
|
||||
pSW = &pME->ntbEntry.schemaRow;
|
||||
} else {
|
||||
ASSERT(0);
|
||||
}
|
||||
|
||||
skmDbKey.uid = pME->uid;
|
||||
skmDbKey.sver = pSW->sver;
|
||||
skmDbKey.sver = pSW->version;
|
||||
|
||||
// encode schema
|
||||
int32_t ret = 0;
|
||||
|
|
|
@ -324,10 +324,10 @@ int tsdbInsertTableData(STsdb *pTsdb, SSubmitMsgIter *pMsgIter, SSubmitBlk *pBlo
|
|||
if(pRsp->tblFName) strcat(pRsp->tblFName, mr.me.name);
|
||||
|
||||
if (mr.me.type == TSDB_NORMAL_TABLE) {
|
||||
sverNew = mr.me.ntbEntry.schema.sver;
|
||||
sverNew = mr.me.ntbEntry.schemaRow.version;
|
||||
} else {
|
||||
metaGetTableEntryByUid(&mr, mr.me.ctbEntry.suid);
|
||||
sverNew = mr.me.stbEntry.schema.sver;
|
||||
sverNew = mr.me.stbEntry.schemaRow.version;
|
||||
}
|
||||
metaReaderClear(&mr);
|
||||
|
||||
|
|
|
@ -64,7 +64,7 @@ int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg) {
|
|||
|
||||
if (mer1.me.type == TSDB_SUPER_TABLE) {
|
||||
strcpy(metaRsp.stbName, mer1.me.name);
|
||||
schema = mer1.me.stbEntry.schema;
|
||||
schema = mer1.me.stbEntry.schemaRow;
|
||||
schemaTag = mer1.me.stbEntry.schemaTag;
|
||||
metaRsp.suid = mer1.me.uid;
|
||||
} else if (mer1.me.type == TSDB_CHILD_TABLE) {
|
||||
|
@ -73,10 +73,10 @@ int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg) {
|
|||
|
||||
strcpy(metaRsp.stbName, mer2.me.name);
|
||||
metaRsp.suid = mer2.me.uid;
|
||||
schema = mer2.me.stbEntry.schema;
|
||||
schema = mer2.me.stbEntry.schemaRow;
|
||||
schemaTag = mer2.me.stbEntry.schemaTag;
|
||||
} else if (mer1.me.type == TSDB_NORMAL_TABLE) {
|
||||
schema = mer1.me.ntbEntry.schema;
|
||||
schema = mer1.me.ntbEntry.schemaRow;
|
||||
} else {
|
||||
ASSERT(0);
|
||||
}
|
||||
|
@ -84,7 +84,7 @@ int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg) {
|
|||
metaRsp.numOfTags = schemaTag.nCols;
|
||||
metaRsp.numOfColumns = schema.nCols;
|
||||
metaRsp.precision = pVnode->config.tsdbCfg.precision;
|
||||
metaRsp.sversion = schema.sver;
|
||||
metaRsp.sversion = schema.version;
|
||||
metaRsp.pSchemas = (SSchema *)taosMemoryMalloc(sizeof(SSchema) * (metaRsp.numOfColumns + metaRsp.numOfTags));
|
||||
|
||||
memcpy(metaRsp.pSchemas, schema.pSchema, sizeof(SSchema) * schema.nCols);
|
||||
|
|
|
@ -124,7 +124,7 @@ static void destroySysTableScannerOperatorInfo(void* param, int32_t numOfOutput)
|
|||
void doSetOperatorCompleted(SOperatorInfo* pOperator) {
|
||||
pOperator->status = OP_EXEC_DONE;
|
||||
|
||||
pOperator->cost.totalCost = (taosGetTimestampUs() - pOperator->pTaskInfo->cost.start * 1000)/1000.0;
|
||||
pOperator->cost.totalCost = (taosGetTimestampUs() - pOperator->pTaskInfo->cost.start * 1000) / 1000.0;
|
||||
if (pOperator->pTaskInfo != NULL) {
|
||||
setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED);
|
||||
}
|
||||
|
@ -2717,7 +2717,7 @@ static void* setAllSourcesCompleted(SOperatorInfo* pOperator, int64_t startTs) {
|
|||
SExchangeInfo* pExchangeInfo = pOperator->info;
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
|
||||
int64_t el = taosGetTimestampUs() - startTs;
|
||||
int64_t el = taosGetTimestampUs() - startTs;
|
||||
SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
|
||||
|
||||
pLoadInfo->totalElapsed += el;
|
||||
|
@ -3023,13 +3023,13 @@ SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, const SNodeList* p
|
|||
|
||||
tsem_init(&pInfo->ready, 0, 0);
|
||||
|
||||
pOperator->name = "ExchangeOperator";
|
||||
pOperator->name = "ExchangeOperator";
|
||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE;
|
||||
pOperator->blocking = false;
|
||||
pOperator->status = OP_NOT_OPENED;
|
||||
pOperator->info = pInfo;
|
||||
pOperator->numOfExprs = pBlock->info.numOfCols;
|
||||
pOperator->pTaskInfo = pTaskInfo;
|
||||
pOperator->blocking = false;
|
||||
pOperator->status = OP_NOT_OPENED;
|
||||
pOperator->info = pInfo;
|
||||
pOperator->numOfExprs = pBlock->info.numOfCols;
|
||||
pOperator->pTaskInfo = pTaskInfo;
|
||||
|
||||
pOperator->fpSet = createOperatorFpSet(prepareLoadRemoteData, doLoadRemoteData, NULL, NULL,
|
||||
destroyExchangeOperatorInfo, NULL, NULL, NULL);
|
||||
|
@ -3465,7 +3465,7 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
|
|||
initGroupedResultInfo(&pAggInfo->groupResInfo, pAggInfo->aggSup.pResultRowHashTable, 0);
|
||||
OPTR_SET_OPENED(pOperator);
|
||||
|
||||
pOperator->cost.openCost = (taosGetTimestampUs() - st)/1000.0;
|
||||
pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -3490,10 +3490,10 @@ static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator) {
|
|||
doSetOperatorCompleted(pOperator);
|
||||
}
|
||||
|
||||
size_t rows = blockDataGetNumOfRows(pInfo->pRes);//pInfo->pRes : NULL;
|
||||
size_t rows = blockDataGetNumOfRows(pInfo->pRes); // pInfo->pRes : NULL;
|
||||
pOperator->resultInfo.totalRows += rows;
|
||||
|
||||
return (rows == 0)? NULL:pInfo->pRes;
|
||||
return (rows == 0) ? NULL : pInfo->pRes;
|
||||
}
|
||||
|
||||
void aggEncodeResultRow(SOperatorInfo* pOperator, SAggSupporter* pSup, SOptrBasicInfo* pInfo, char** result,
|
||||
|
@ -3778,10 +3778,10 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
|
|||
pOperator->resultInfo.totalRows += rows;
|
||||
|
||||
if (pOperator->cost.openCost == 0) {
|
||||
pOperator->cost.openCost = (taosGetTimestampUs() - st)/ 1000.0;
|
||||
pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
|
||||
}
|
||||
|
||||
return (rows > 0)? pInfo->pRes:NULL;
|
||||
return (rows > 0) ? pInfo->pRes : NULL;
|
||||
}
|
||||
|
||||
static void doHandleRemainBlockForNewGroupImpl(SFillOperatorInfo* pInfo, SResultInfo* pResultInfo, bool* newgroup,
|
||||
|
@ -4455,15 +4455,15 @@ void extractTableSchemaVersion(SReadHandle* pHandle, uint64_t uid, SExecTaskInfo
|
|||
pTaskInfo->schemaVer.tablename = strdup(mr.me.name);
|
||||
|
||||
if (mr.me.type == TSDB_SUPER_TABLE) {
|
||||
pTaskInfo->schemaVer.sversion = mr.me.stbEntry.schema.sver;
|
||||
pTaskInfo->schemaVer.tversion = mr.me.stbEntry.schemaTag.sver;
|
||||
pTaskInfo->schemaVer.sversion = mr.me.stbEntry.schemaRow.version;
|
||||
pTaskInfo->schemaVer.tversion = mr.me.stbEntry.schemaTag.version;
|
||||
} else if (mr.me.type == TSDB_CHILD_TABLE) {
|
||||
tb_uid_t suid = mr.me.ctbEntry.suid;
|
||||
metaGetTableEntryByUid(&mr, suid);
|
||||
pTaskInfo->schemaVer.sversion = mr.me.stbEntry.schema.sver;
|
||||
pTaskInfo->schemaVer.tversion = mr.me.stbEntry.schemaTag.sver;
|
||||
pTaskInfo->schemaVer.sversion = mr.me.stbEntry.schemaRow.version;
|
||||
pTaskInfo->schemaVer.tversion = mr.me.stbEntry.schemaTag.version;
|
||||
} else {
|
||||
pTaskInfo->schemaVer.sversion = mr.me.ntbEntry.schema.sver;
|
||||
pTaskInfo->schemaVer.sversion = mr.me.ntbEntry.schemaRow.version;
|
||||
}
|
||||
|
||||
metaReaderClear(&mr);
|
||||
|
@ -4668,8 +4668,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
|||
SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
|
||||
int32_t tsSlotId = ((SColumnNode*)pSessionNode->window.pTspk)->slotId;
|
||||
|
||||
pOptr =
|
||||
createStreamSessionAggOperatorInfo(ops[0], pExprInfo, num, pResBlock, pSessionNode->gap, tsSlotId, &as, pTaskInfo);
|
||||
pOptr = createStreamSessionAggOperatorInfo(ops[0], pExprInfo, num, pResBlock, pSessionNode->gap, tsSlotId, &as,
|
||||
pTaskInfo);
|
||||
|
||||
} else if (QUERY_NODE_PHYSICAL_PLAN_PARTITION == type) {
|
||||
SPartitionPhysiNode* pPartNode = (SPartitionPhysiNode*)pPhyNode;
|
||||
|
@ -5162,8 +5162,7 @@ int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SExplainExecInfo
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t initCatchSupporter(SCatchSupporter* pCatchSup, size_t rowSize, const char* pKey,
|
||||
const char* pDir) {
|
||||
int32_t initCatchSupporter(SCatchSupporter* pCatchSup, size_t rowSize, const char* pKey, const char* pDir) {
|
||||
pCatchSup->keySize = sizeof(int64_t) + sizeof(int64_t) + sizeof(TSKEY);
|
||||
pCatchSup->pKeyBuf = taosMemoryCalloc(1, pCatchSup->keySize);
|
||||
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
||||
|
|
|
@ -13,8 +13,8 @@
|
|||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include "function.h"
|
||||
#include "filter.h"
|
||||
#include "function.h"
|
||||
#include "functionMgt.h"
|
||||
#include "os.h"
|
||||
#include "querynodes.h"
|
||||
|
@ -142,7 +142,7 @@ static bool overlapWithTimeWindow(SInterval* pInterval, SDataBlockInfo* pBlockIn
|
|||
return true;
|
||||
}
|
||||
|
||||
while(1) {
|
||||
while (1) {
|
||||
getNextTimeWindow(pInterval, &w, order);
|
||||
if (w.ekey < pBlockInfo->window.skey) {
|
||||
break;
|
||||
|
@ -190,7 +190,7 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca
|
|||
pCost->skipBlocks += 1;
|
||||
|
||||
// clear all data in pBlock that are set when handing the previous block
|
||||
for(int32_t i = 0; i < pBlockInfo->numOfCols; ++i) {
|
||||
for (int32_t i = 0; i < pBlockInfo->numOfCols; ++i) {
|
||||
SColumnInfoData* pcol = taosArrayGet(pBlock->pDataBlock, i);
|
||||
pcol->pData = NULL;
|
||||
}
|
||||
|
@ -324,7 +324,6 @@ void addTagPseudoColumnData(STableScanInfo* pTableScanInfo, SSDataBlock* pBlock)
|
|||
for (int32_t i = 0; i < pBlock->info.rows; ++i) {
|
||||
colDataAppend(pColInfoData, i, p, (p == NULL));
|
||||
}
|
||||
|
||||
if (pColInfoData->info.type == TSDB_DATA_TYPE_JSON) {
|
||||
taosMemoryFree((void*)p);
|
||||
}
|
||||
|
@ -343,7 +342,7 @@ void setTbNameColData(void* pMeta, const SSDataBlock* pBlock, SColumnInfoData* p
|
|||
infoData.info.bytes = sizeof(uint64_t);
|
||||
colInfoDataEnsureCapacity(&infoData, 0, 1);
|
||||
|
||||
colDataAppendInt64(&infoData, 0, (int64_t*) &pBlock->info.uid);
|
||||
colDataAppendInt64(&infoData, 0, (int64_t*)&pBlock->info.uid);
|
||||
SScalarParam srcParam = {.numOfRows = pBlock->info.rows, .param = pMeta, .columnData = &infoData};
|
||||
|
||||
SScalarParam param = {.columnData = pColInfoData};
|
||||
|
@ -376,7 +375,7 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) {
|
|||
}
|
||||
|
||||
pOperator->resultInfo.totalRows = pTableScanInfo->readRecorder.totalRows;
|
||||
pTableScanInfo->readRecorder.elapsedTime += (taosGetTimestampUs() - st)/1000.0;
|
||||
pTableScanInfo->readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0;
|
||||
|
||||
pOperator->cost.totalCost = pTableScanInfo->readRecorder.elapsedTime;
|
||||
return pBlock;
|
||||
|
@ -409,7 +408,8 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
|
|||
|
||||
STimeWindow* pWin = &pTableScanInfo->cond.twindow;
|
||||
qDebug("%s start to repeat ascending order scan data blocks due to query func required, qrange:%" PRId64
|
||||
"-%" PRId64, GET_TASKID(pTaskInfo), pWin->skey, pWin->ekey);
|
||||
"-%" PRId64,
|
||||
GET_TASKID(pTaskInfo), pWin->skey, pWin->ekey);
|
||||
|
||||
// do prepare for the next round table scan operation
|
||||
tsdbResetReadHandle(pTableScanInfo->dataReader, &pTableScanInfo->cond);
|
||||
|
@ -467,7 +467,7 @@ SInterval extractIntervalInfo(const STableScanPhysiNode* pTableScanNode) {
|
|||
|
||||
static int32_t getTableScannerExecInfo(struct SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) {
|
||||
SFileBlockLoadRecorder* pRecorder = taosMemoryCalloc(1, sizeof(SFileBlockLoadRecorder));
|
||||
STableScanInfo* pTableScanInfo = pOptr->info;
|
||||
STableScanInfo* pTableScanInfo = pOptr->info;
|
||||
*pRecorder = pTableScanInfo->readRecorder;
|
||||
*pOptrExplain = pRecorder;
|
||||
*len = sizeof(SFileBlockLoadRecorder);
|
||||
|
@ -484,7 +484,8 @@ static void destroyTableScanOperatorInfo(void* param, int32_t numOfOutput) {
|
|||
}
|
||||
}
|
||||
|
||||
SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, tsdbReaderT pDataReader, SReadHandle* readHandle, SExecTaskInfo* pTaskInfo) {
|
||||
SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, tsdbReaderT pDataReader,
|
||||
SReadHandle* readHandle, SExecTaskInfo* pTaskInfo) {
|
||||
STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo));
|
||||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||
if (pInfo == NULL || pOperator == NULL) {
|
||||
|
@ -498,7 +499,8 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode,
|
|||
SDataBlockDescNode* pDescNode = pTableScanNode->scan.node.pOutputDataBlockDesc;
|
||||
|
||||
int32_t numOfCols = 0;
|
||||
SArray* pColList = extractColMatchInfo(pTableScanNode->scan.pScanCols, pDescNode, &numOfCols, pTaskInfo, COL_MATCH_FROM_COL_ID);
|
||||
SArray* pColList =
|
||||
extractColMatchInfo(pTableScanNode->scan.pScanCols, pDescNode, &numOfCols, pTaskInfo, COL_MATCH_FROM_COL_ID);
|
||||
|
||||
int32_t code = initQueryTableDataCond(&pInfo->cond, pTableScanNode);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
|
@ -507,31 +509,32 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode,
|
|||
|
||||
if (pTableScanNode->scan.pScanPseudoCols != NULL) {
|
||||
pInfo->pPseudoExpr = createExprInfo(pTableScanNode->scan.pScanPseudoCols, NULL, &pInfo->numOfPseudoExpr);
|
||||
pInfo->pPseudoCtx = createSqlFunctionCtx(pInfo->pPseudoExpr, pInfo->numOfPseudoExpr, &pInfo->rowCellInfoOffset);
|
||||
pInfo->pPseudoCtx = createSqlFunctionCtx(pInfo->pPseudoExpr, pInfo->numOfPseudoExpr, &pInfo->rowCellInfoOffset);
|
||||
}
|
||||
|
||||
pInfo->scanInfo = (SScanInfo){.numOfAsc = pTableScanNode->scanSeq[0], .numOfDesc = pTableScanNode->scanSeq[1]};
|
||||
// pInfo->scanInfo = (SScanInfo){.numOfAsc = 0, .numOfDesc = 1}; // for debug purpose
|
||||
// pInfo->scanInfo = (SScanInfo){.numOfAsc = 0, .numOfDesc = 1}; // for debug purpose
|
||||
|
||||
pInfo->readHandle = *readHandle;
|
||||
pInfo->interval = extractIntervalInfo(pTableScanNode);
|
||||
pInfo->sampleRatio = pTableScanNode->ratio;
|
||||
pInfo->readHandle = *readHandle;
|
||||
pInfo->interval = extractIntervalInfo(pTableScanNode);
|
||||
pInfo->sampleRatio = pTableScanNode->ratio;
|
||||
pInfo->dataBlockLoadFlag = pTableScanNode->dataRequired;
|
||||
pInfo->pResBlock = createResDataBlock(pDescNode);
|
||||
pInfo->pFilterNode = pTableScanNode->scan.node.pConditions;
|
||||
pInfo->dataReader = pDataReader;
|
||||
pInfo->scanFlag = MAIN_SCAN;
|
||||
pInfo->pColMatchInfo = pColList;
|
||||
pInfo->pResBlock = createResDataBlock(pDescNode);
|
||||
pInfo->pFilterNode = pTableScanNode->scan.node.pConditions;
|
||||
pInfo->dataReader = pDataReader;
|
||||
pInfo->scanFlag = MAIN_SCAN;
|
||||
pInfo->pColMatchInfo = pColList;
|
||||
|
||||
pOperator->name = "TableScanOperator"; // for debug purpose
|
||||
pOperator->name = "TableScanOperator"; // for debug purpose
|
||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN;
|
||||
pOperator->blocking = false;
|
||||
pOperator->status = OP_NOT_OPENED;
|
||||
pOperator->info = pInfo;
|
||||
pOperator->numOfExprs = numOfCols;
|
||||
pOperator->pTaskInfo = pTaskInfo;
|
||||
pOperator->blocking = false;
|
||||
pOperator->status = OP_NOT_OPENED;
|
||||
pOperator->info = pInfo;
|
||||
pOperator->numOfExprs = numOfCols;
|
||||
pOperator->pTaskInfo = pTaskInfo;
|
||||
|
||||
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTableScan, NULL, NULL, destroyTableScanOperatorInfo, NULL, NULL, getTableScannerExecInfo);
|
||||
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTableScan, NULL, NULL, destroyTableScanOperatorInfo,
|
||||
NULL, NULL, getTableScannerExecInfo);
|
||||
|
||||
// for non-blocking operator, the open cost is always 0
|
||||
pOperator->cost.openCost = 0;
|
||||
|
@ -649,32 +652,30 @@ static void doClearBufferedBlocks(SStreamBlockScanInfo* pInfo) {
|
|||
taosArrayClear(pInfo->pBlockLists);
|
||||
}
|
||||
|
||||
static bool isSessionWindow(SStreamBlockScanInfo* pInfo) {
|
||||
return pInfo->sessionSup.pStreamAggSup != NULL;
|
||||
}
|
||||
static bool isSessionWindow(SStreamBlockScanInfo* pInfo) { return pInfo->sessionSup.pStreamAggSup != NULL; }
|
||||
|
||||
static bool prepareDataScan(SStreamBlockScanInfo* pInfo) {
|
||||
SSDataBlock* pSDB = pInfo->pUpdateRes;
|
||||
if (pInfo->updateResIndex < pSDB->info.rows) {
|
||||
SColumnInfoData* pColDataInfo = taosArrayGet(pSDB->pDataBlock, 0);
|
||||
TSKEY *tsCols = (TSKEY*)pColDataInfo->pData;
|
||||
SResultRowInfo dumyInfo;
|
||||
TSKEY* tsCols = (TSKEY*)pColDataInfo->pData;
|
||||
SResultRowInfo dumyInfo;
|
||||
dumyInfo.cur.pageId = -1;
|
||||
STimeWindow win;
|
||||
if (isSessionWindow(pInfo)) {
|
||||
SStreamAggSupporter* pAggSup = pInfo->sessionSup.pStreamAggSup;
|
||||
int64_t gap = pInfo->sessionSup.gap;
|
||||
int32_t winIndex = 0;
|
||||
SResultWindowInfo* pCurWin = getSessionTimeWindow(pAggSup->pResultRows,
|
||||
tsCols[pInfo->updateResIndex], gap, &winIndex);
|
||||
int64_t gap = pInfo->sessionSup.gap;
|
||||
int32_t winIndex = 0;
|
||||
SResultWindowInfo* pCurWin =
|
||||
getSessionTimeWindow(pAggSup->pResultRows, tsCols[pInfo->updateResIndex], gap, &winIndex);
|
||||
win = pCurWin->win;
|
||||
pInfo->updateResIndex += updateSessionWindowInfo(pCurWin, tsCols, pSDB->info.rows,
|
||||
pInfo->updateResIndex, gap, NULL);
|
||||
pInfo->updateResIndex +=
|
||||
updateSessionWindowInfo(pCurWin, tsCols, pSDB->info.rows, pInfo->updateResIndex, gap, NULL);
|
||||
} else {
|
||||
win = getActiveTimeWindow(NULL, &dumyInfo, tsCols[pInfo->updateResIndex],
|
||||
&pInfo->interval, pInfo->interval.precision, NULL);
|
||||
pInfo->updateResIndex += getNumOfRowsInTimeWindow(&pSDB->info, tsCols, pInfo->updateResIndex,
|
||||
win.ekey, binarySearchForKey, NULL, TSDB_ORDER_ASC);
|
||||
win = getActiveTimeWindow(NULL, &dumyInfo, tsCols[pInfo->updateResIndex], &pInfo->interval,
|
||||
pInfo->interval.precision, NULL);
|
||||
pInfo->updateResIndex += getNumOfRowsInTimeWindow(&pSDB->info, tsCols, pInfo->updateResIndex, win.ekey,
|
||||
binarySearchForKey, NULL, TSDB_ORDER_ASC);
|
||||
}
|
||||
STableScanInfo* pTableScanInfo = pInfo->pOperatorDumy->info;
|
||||
pTableScanInfo->cond.twindow = win;
|
||||
|
@ -713,8 +714,8 @@ static SSDataBlock* getUpdateDataBlock(SStreamBlockScanInfo* pInfo, bool inverti
|
|||
// p->info.type = STREAM_INVERT;
|
||||
// taosArrayClear(pInfo->tsArray);
|
||||
// return p;
|
||||
SSDataBlock* pDataBlock = createOneDataBlock(pInfo->pRes, false);
|
||||
SColumnInfoData* pCol = (SColumnInfoData*) taosArrayGet(pDataBlock->pDataBlock, 0);
|
||||
SSDataBlock* pDataBlock = createOneDataBlock(pInfo->pRes, false);
|
||||
SColumnInfoData* pCol = (SColumnInfoData*)taosArrayGet(pDataBlock->pDataBlock, 0);
|
||||
ASSERT(pCol->info.type == TSDB_DATA_TYPE_TIMESTAMP);
|
||||
colInfoDataEnsureCapacity(pCol, 0, size);
|
||||
for (int32_t i = 0; i < size; i++) {
|
||||
|
@ -737,19 +738,17 @@ void static setSupKeyBuf(SCatchSupporter* pSup, int64_t groupId, int64_t childId
|
|||
pKey[2] = ts;
|
||||
}
|
||||
|
||||
static int32_t catchWidonwInfo(SSDataBlock* pDataBlock, SCatchSupporter* pSup,
|
||||
int32_t pageId, int32_t tsIndex, int64_t childId) {
|
||||
static int32_t catchWidonwInfo(SSDataBlock* pDataBlock, SCatchSupporter* pSup, int32_t pageId, int32_t tsIndex,
|
||||
int64_t childId) {
|
||||
SColumnInfoData* pColDataInfo = taosArrayGet(pDataBlock->pDataBlock, tsIndex);
|
||||
TSKEY* tsCols = (int64_t*)pColDataInfo->pData;
|
||||
TSKEY* tsCols = (int64_t*)pColDataInfo->pData;
|
||||
for (int32_t i = 0; i < pDataBlock->info.rows; i++) {
|
||||
setSupKeyBuf(pSup, pDataBlock->info.groupId, childId, tsCols[i]);
|
||||
SWindowPosition* p1 = (SWindowPosition*)taosHashGet(pSup->pWindowHashTable,
|
||||
pSup->pKeyBuf, pSup->keySize);
|
||||
SWindowPosition* p1 = (SWindowPosition*)taosHashGet(pSup->pWindowHashTable, pSup->pKeyBuf, pSup->keySize);
|
||||
if (p1 == NULL) {
|
||||
SWindowPosition pos = {.pageId = pageId, .rowId = i};
|
||||
int32_t code = taosHashPut(pSup->pWindowHashTable, pSup->pKeyBuf, pSup->keySize, &pos,
|
||||
sizeof(SWindowPosition));
|
||||
if (code != TSDB_CODE_SUCCESS ) {
|
||||
int32_t code = taosHashPut(pSup->pWindowHashTable, pSup->pKeyBuf, pSup->keySize, &pos, sizeof(SWindowPosition));
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
} else {
|
||||
|
@ -760,24 +759,23 @@ static int32_t catchWidonwInfo(SSDataBlock* pDataBlock, SCatchSupporter* pSup,
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t catchDatablock(SSDataBlock* pDataBlock, SCatchSupporter* pSup,
|
||||
int32_t tsIndex, int64_t childId) {
|
||||
static int32_t catchDatablock(SSDataBlock* pDataBlock, SCatchSupporter* pSup, int32_t tsIndex, int64_t childId) {
|
||||
int32_t start = 0;
|
||||
int32_t stop = 0;
|
||||
int32_t pageSize = getBufPageSize(pSup->pDataBuf);
|
||||
while(start < pDataBlock->info.rows) {
|
||||
while (start < pDataBlock->info.rows) {
|
||||
blockDataSplitRows(pDataBlock, pDataBlock->info.hasVarCol, start, &stop, pageSize);
|
||||
SSDataBlock* pDB = blockDataExtractBlock(pDataBlock, start, stop - start + 1);
|
||||
if (pDB == NULL) {
|
||||
return terrno;
|
||||
}
|
||||
int32_t pageId = -1;
|
||||
void* pPage = getNewBufPage(pSup->pDataBuf, pDataBlock->info.groupId, &pageId);
|
||||
void* pPage = getNewBufPage(pSup->pDataBuf, pDataBlock->info.groupId, &pageId);
|
||||
if (pPage == NULL) {
|
||||
blockDataDestroy(pDB);
|
||||
return terrno;
|
||||
}
|
||||
int32_t size = blockDataGetSize(pDB) + sizeof(int32_t) + pDB->info.numOfCols * sizeof(int32_t);
|
||||
int32_t size = blockDataGetSize(pDB) + sizeof(int32_t) + pDB->info.numOfCols * sizeof(int32_t);
|
||||
assert(size <= pageSize);
|
||||
blockDataToBuf(pPage, pDB);
|
||||
setBufPageDirty(pPage, true);
|
||||
|
@ -785,7 +783,7 @@ static int32_t catchDatablock(SSDataBlock* pDataBlock, SCatchSupporter* pSup,
|
|||
blockDataDestroy(pDB);
|
||||
start = stop + 1;
|
||||
int32_t code = catchWidonwInfo(pDB, pSup, pageId, tsIndex, childId);
|
||||
if (code != TSDB_CODE_SUCCESS ) {
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
}
|
||||
|
@ -798,16 +796,14 @@ static SSDataBlock* getDataFromCatch(SStreamBlockScanInfo* pInfo) {
|
|||
blockDataCleanup(pInfo->pRes);
|
||||
SCatchSupporter* pCSup = &pInfo->childAggSup;
|
||||
SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, 0);
|
||||
TSKEY *tsCols = (TSKEY*)pColDataInfo->pData;
|
||||
int32_t size = taosArrayGetSize(pInfo->childIds);
|
||||
TSKEY* tsCols = (TSKEY*)pColDataInfo->pData;
|
||||
int32_t size = taosArrayGetSize(pInfo->childIds);
|
||||
for (int32_t i = 0; i < size; i++) {
|
||||
int64_t id = *(int64_t *)taosArrayGet(pInfo->childIds, i);
|
||||
setSupKeyBuf(pCSup, pBlock->info.groupId, id,
|
||||
tsCols[pInfo->updateResIndex]);
|
||||
SWindowPosition* pos = (SWindowPosition*)taosHashGet(pCSup->pWindowHashTable,
|
||||
pCSup->pKeyBuf, pCSup->keySize);
|
||||
void* buf = getBufPage(pCSup->pDataBuf, pos->pageId);
|
||||
SSDataBlock* pDB = createOneDataBlock(pInfo->pRes, false);
|
||||
int64_t id = *(int64_t*)taosArrayGet(pInfo->childIds, i);
|
||||
setSupKeyBuf(pCSup, pBlock->info.groupId, id, tsCols[pInfo->updateResIndex]);
|
||||
SWindowPosition* pos = (SWindowPosition*)taosHashGet(pCSup->pWindowHashTable, pCSup->pKeyBuf, pCSup->keySize);
|
||||
void* buf = getBufPage(pCSup->pDataBuf, pos->pageId);
|
||||
SSDataBlock* pDB = createOneDataBlock(pInfo->pRes, false);
|
||||
blockDataFromBuf(pDB, buf);
|
||||
SSDataBlock* pSub = blockDataExtractBlock(pDB, pos->rowId, 1);
|
||||
blockDataMerge(pInfo->pRes, pSub);
|
||||
|
@ -838,7 +834,7 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
|
|||
if (pDB != NULL) {
|
||||
return pDB;
|
||||
} else {
|
||||
pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
|
||||
pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -848,7 +844,7 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
|
|||
return NULL;
|
||||
}
|
||||
|
||||
int32_t current = pInfo->validBlockIndex++;
|
||||
int32_t current = pInfo->validBlockIndex++;
|
||||
SSDataBlock* pBlock = taosArrayGetP(pInfo->pBlockLists, current);
|
||||
if (pBlock->info.type == STREAM_REPROCESS) {
|
||||
pInfo->scanMode = STREAM_SCAN_FROM_UPDATERES;
|
||||
|
@ -946,7 +942,7 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
|
|||
if (rows == 0) {
|
||||
pOperator->status = OP_EXEC_DONE;
|
||||
} else if (pInfo->pUpdateInfo) {
|
||||
SSDataBlock* upRes = getUpdateDataBlock(pInfo, true); //TODO(liuyao) get invertible from plan
|
||||
SSDataBlock* upRes = getUpdateDataBlock(pInfo, true); // TODO(liuyao) get invertible from plan
|
||||
if (upRes) {
|
||||
pInfo->pUpdateRes = upRes;
|
||||
if (upRes->info.type == STREAM_REPROCESS) {
|
||||
|
@ -981,7 +977,7 @@ SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, void* pDataR
|
|||
SArray* pColIds = taosArrayInit(4, sizeof(int16_t));
|
||||
for (int32_t i = 0; i < numOfOutput; ++i) {
|
||||
SColMatchInfo* id = taosArrayGet(pColList, i);
|
||||
int16_t colId = id->colId;
|
||||
int16_t colId = id->colId;
|
||||
taosArrayPush(pColIds, &colId);
|
||||
}
|
||||
|
||||
|
@ -1005,33 +1001,34 @@ SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, void* pDataR
|
|||
goto _error;
|
||||
}
|
||||
|
||||
pInfo->primaryTsIndex = 0; // TODO(liuyao) get it from physical plan
|
||||
pInfo->primaryTsIndex = 0; // TODO(liuyao) get it from physical plan
|
||||
if (pSTInfo->interval.interval > 0) {
|
||||
pInfo->pUpdateInfo = updateInfoInitP(&pSTInfo->interval, 10000); // TODO(liuyao) get watermark from physical plan
|
||||
pInfo->pUpdateInfo = updateInfoInitP(&pSTInfo->interval, 10000); // TODO(liuyao) get watermark from physical plan
|
||||
} else {
|
||||
pInfo->pUpdateInfo = NULL;
|
||||
}
|
||||
|
||||
pInfo->readHandle = *pHandle;
|
||||
pInfo->tableUid = uid;
|
||||
pInfo->readHandle = *pHandle;
|
||||
pInfo->tableUid = uid;
|
||||
pInfo->streamBlockReader = streamReadHandle;
|
||||
pInfo->pRes = pResBlock;
|
||||
pInfo->pCondition = pCondition;
|
||||
pInfo->pDataReader = pDataReader;
|
||||
pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
|
||||
pInfo->pOperatorDumy = pOperatorDumy;
|
||||
pInfo->interval = pSTInfo->interval;
|
||||
pInfo->sessionSup = (SessionWindowSupporter){.pStreamAggSup = NULL, .gap = -1};
|
||||
pInfo->pRes = pResBlock;
|
||||
pInfo->pCondition = pCondition;
|
||||
pInfo->pDataReader = pDataReader;
|
||||
pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
|
||||
pInfo->pOperatorDumy = pOperatorDumy;
|
||||
pInfo->interval = pSTInfo->interval;
|
||||
pInfo->sessionSup = (SessionWindowSupporter){.pStreamAggSup = NULL, .gap = -1};
|
||||
|
||||
initCatchSupporter(&pInfo->childAggSup, 1024, "StreamFinalInterval", "/tmp/"); // TODO(liuyao) get row size from phy plan
|
||||
initCatchSupporter(&pInfo->childAggSup, 1024, "StreamFinalInterval",
|
||||
"/tmp/"); // TODO(liuyao) get row size from phy plan
|
||||
|
||||
pOperator->name = "StreamBlockScanOperator";
|
||||
pOperator->name = "StreamBlockScanOperator";
|
||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN;
|
||||
pOperator->blocking = false;
|
||||
pOperator->status = OP_NOT_OPENED;
|
||||
pOperator->info = pInfo;
|
||||
pOperator->blocking = false;
|
||||
pOperator->status = OP_NOT_OPENED;
|
||||
pOperator->info = pInfo;
|
||||
pOperator->numOfExprs = pResBlock->info.numOfCols;
|
||||
pOperator->pTaskInfo = pTaskInfo;
|
||||
pOperator->pTaskInfo = pTaskInfo;
|
||||
|
||||
pOperator->fpSet =
|
||||
createOperatorFpSet(operatorDummyOpenFn, doStreamBlockScan, NULL, NULL, operatorDummyCloseFn, NULL, NULL, NULL);
|
||||
|
@ -1296,7 +1293,7 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) {
|
|||
|
||||
// number of columns
|
||||
pColInfoData = taosArrayGet(p->pDataBlock, 3);
|
||||
colDataAppend(pColInfoData, numOfRows, (char*)&mr.me.stbEntry.schema.nCols, false);
|
||||
colDataAppend(pColInfoData, numOfRows, (char*)&mr.me.stbEntry.schemaRow.nCols, false);
|
||||
|
||||
// super table name
|
||||
STR_TO_VARSTR(str, mr.me.name);
|
||||
|
@ -1320,7 +1317,7 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) {
|
|||
|
||||
// number of columns
|
||||
pColInfoData = taosArrayGet(p->pDataBlock, 3);
|
||||
colDataAppend(pColInfoData, numOfRows, (char*)&pInfo->pCur->mr.me.ntbEntry.schema.nCols, false);
|
||||
colDataAppend(pColInfoData, numOfRows, (char*)&pInfo->pCur->mr.me.ntbEntry.schemaRow.nCols, false);
|
||||
|
||||
// super table name
|
||||
pColInfoData = taosArrayGet(p->pDataBlock, 4);
|
||||
|
@ -1646,7 +1643,7 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) {
|
|||
if (fmIsScanPseudoColumnFunc(pExprInfo[j].pExpr->_function.functionId)) {
|
||||
STR_TO_VARSTR(str, mr.me.name);
|
||||
colDataAppend(pDst, count, str, false);
|
||||
} else { // it is a tag value
|
||||
} else { // it is a tag value
|
||||
if (pDst->info.type == TSDB_DATA_TYPE_JSON) {
|
||||
const uint8_t* tmp = mr.me.ctbEntry.pTags;
|
||||
// TODO opt perf by realloc memory
|
||||
|
|
|
@ -3943,7 +3943,7 @@ typedef struct SVgroupCreateTableBatch {
|
|||
|
||||
static void destroyCreateTbReq(SVCreateTbReq* pReq) {
|
||||
taosMemoryFreeClear(pReq->name);
|
||||
taosMemoryFreeClear(pReq->ntb.schema.pSchema);
|
||||
taosMemoryFreeClear(pReq->ntb.schemaRow.pSchema);
|
||||
}
|
||||
|
||||
static int32_t buildNormalTableBatchReq(int32_t acctId, const SCreateTableStmt* pStmt, const SVgroupInfo* pVgroupInfo,
|
||||
|
@ -3956,10 +3956,10 @@ static int32_t buildNormalTableBatchReq(int32_t acctId, const SCreateTableStmt*
|
|||
SVCreateTbReq req = {0};
|
||||
req.type = TD_NORMAL_TABLE;
|
||||
req.name = strdup(pStmt->tableName);
|
||||
req.ntb.schema.nCols = LIST_LENGTH(pStmt->pCols);
|
||||
req.ntb.schema.sver = 1;
|
||||
req.ntb.schema.pSchema = taosMemoryCalloc(req.ntb.schema.nCols, sizeof(SSchema));
|
||||
if (NULL == req.name || NULL == req.ntb.schema.pSchema) {
|
||||
req.ntb.schemaRow.nCols = LIST_LENGTH(pStmt->pCols);
|
||||
req.ntb.schemaRow.version = 1;
|
||||
req.ntb.schemaRow.pSchema = taosMemoryCalloc(req.ntb.schemaRow.nCols, sizeof(SSchema));
|
||||
if (NULL == req.name || NULL == req.ntb.schemaRow.pSchema) {
|
||||
destroyCreateTbReq(&req);
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
@ -3969,7 +3969,7 @@ static int32_t buildNormalTableBatchReq(int32_t acctId, const SCreateTableStmt*
|
|||
SNode* pCol;
|
||||
col_id_t index = 0;
|
||||
FOREACH(pCol, pStmt->pCols) {
|
||||
toSchema((SColumnDefNode*)pCol, index + 1, req.ntb.schema.pSchema + index);
|
||||
toSchema((SColumnDefNode*)pCol, index + 1, req.ntb.schemaRow.pSchema + index);
|
||||
++index;
|
||||
}
|
||||
pBatch->info = *pVgroupInfo;
|
||||
|
@ -4023,7 +4023,7 @@ static void destroyCreateTbReqBatch(SVgroupCreateTableBatch* pTbBatch) {
|
|||
taosMemoryFreeClear(pTableReq->name);
|
||||
|
||||
if (pTableReq->type == TSDB_NORMAL_TABLE) {
|
||||
taosMemoryFreeClear(pTableReq->ntb.schema.pSchema);
|
||||
taosMemoryFreeClear(pTableReq->ntb.schemaRow.pSchema);
|
||||
} else if (pTableReq->type == TSDB_CHILD_TABLE) {
|
||||
taosMemoryFreeClear(pTableReq->ctb.pTag);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue