diff --git a/include/common/tdatablock.h b/include/common/tdatablock.h index bcb40f4175..292e9a3181 100644 --- a/include/common/tdatablock.h +++ b/include/common/tdatablock.h @@ -32,6 +32,9 @@ typedef struct SBlockOrderInfo { SColumnInfoData* pColData; } SBlockOrderInfo; +#define BLOCK_VERSION_1 1 +#define BLOCK_VERSION_2 2 + #define NBIT (3u) #define BitPos(_n) ((_n) & ((1 << NBIT) - 1)) #define BMCharPos(bm_, r_) ((bm_)[(r_) >> NBIT]) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 1b438ba026..587e2f9f3e 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -207,9 +207,6 @@ typedef enum _mgmt_table { #define TD_CHILD_TABLE TSDB_CHILD_TABLE #define TD_NORMAL_TABLE TSDB_NORMAL_TABLE -#define TD_REQ_FROM_APP 0 -#define TD_REQ_FROM_TAOX 1 - typedef enum ENodeType { // Syntax nodes are used in parser and planner module, and some are also used in executor module, such as COLUMN, // VALUE, OPERATOR, FUNCTION and so on. @@ -759,7 +756,7 @@ static FORCE_INLINE int32_t tDecodeSSchemaWrapperEx(SDecoder* pDecoder, SSchemaW typedef struct { char name[TSDB_TABLE_FNAME_LEN]; int8_t igExists; - int8_t source; // 1-taosX or 0-taosClient + int8_t source; // TD_REQ_FROM_TAOX-taosX or TD_REQ_FROM_APP-taosClient int8_t reserved[6]; tb_uid_t suid; int64_t delay1; @@ -802,7 +799,7 @@ void tFreeSMCreateStbRsp(SMCreateStbRsp* pRsp); typedef struct { char name[TSDB_TABLE_FNAME_LEN]; int8_t igNotExists; - int8_t source; // 1-taosX or 0-taosClient + int8_t source; // TD_REQ_FROM_TAOX-taosX or TD_REQ_FROM_APP-taosClient int8_t reserved[6]; tb_uid_t suid; int32_t sqlLen; @@ -2661,6 +2658,7 @@ typedef struct SVCreateStbReq { SRSmaParam rsmaParam; int32_t alterOriDataLen; void* alterOriData; + int8_t source; } SVCreateStbReq; int tEncodeSVCreateStbReq(SEncoder* pCoder, const SVCreateStbReq* pReq); @@ -2730,6 +2728,7 @@ typedef struct { SVCreateTbReq* pReqs; SArray* pArray; }; + int8_t source; // TD_REQ_FROM_TAOX-taosX or TD_REQ_FROM_APP-taosClient } SVCreateTbBatchReq; int tEncodeSVCreateTbBatchReq(SEncoder* pCoder, const SVCreateTbBatchReq* pReq); @@ -2822,6 +2821,7 @@ typedef struct { int32_t newCommentLen; char* newComment; int64_t ctimeMs; // fill by vnode + int8_t source; // TD_REQ_FROM_TAOX-taosX or TD_REQ_FROM_APP-taosClient } SVAlterTbReq; int32_t tEncodeSVAlterTbReq(SEncoder* pEncoder, const SVAlterTbReq* pReq); @@ -3919,12 +3919,13 @@ int32_t tDeatroySMqHbRsp(SMqHbRsp* pRsp); int32_t tSerializeSMqSeekReq(void* buf, int32_t bufLen, SMqSeekReq* pReq); int32_t tDeserializeSMqSeekReq(void* buf, int32_t bufLen, SMqSeekReq* pReq); +#define TD_REQ_FROM_APP 0x0 #define SUBMIT_REQ_AUTO_CREATE_TABLE 0x1 #define SUBMIT_REQ_COLUMN_DATA_FORMAT 0x2 #define SUBMIT_REQ_FROM_FILE 0x4 +#define TD_REQ_FROM_TAOX 0x8 -#define SOURCE_NULL 0 -#define SOURCE_TAOSX 1 +#define TD_REQ_FROM_TAOX_OLD 0x1 // for compatibility typedef struct { int32_t flags; @@ -3937,7 +3938,6 @@ typedef struct { SArray* aCol; }; int64_t ctimeMs; - int8_t source; } SSubmitTbData; typedef struct { diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 971ed407f9..68eb569dfc 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -1850,7 +1850,7 @@ static int32_t estimateJsonLen(SReqResultInfo* pResultInfo, int32_t numOfCols, i char* pStart = p + len; for (int32_t i = 0; i < numOfCols; ++i) { - int32_t colLen = (blockVersion == 1) ? htonl(colLength[i]) : colLength[i]; + int32_t colLen = (blockVersion == BLOCK_VERSION_1) ? htonl(colLength[i]) : colLength[i]; if (pResultInfo->fields[i].type == TSDB_DATA_TYPE_JSON) { int32_t* offset = (int32_t*)pStart; @@ -1949,8 +1949,8 @@ static int32_t doConvertJson(SReqResultInfo* pResultInfo, int32_t numOfCols, int char* pStart = p; char* pStart1 = p1; for (int32_t i = 0; i < numOfCols; ++i) { - int32_t colLen = (blockVersion == 1) ? htonl(colLength[i]) : colLength[i]; - int32_t colLen1 = (blockVersion == 1) ? htonl(colLength1[i]) : colLength1[i]; + int32_t colLen = (blockVersion == BLOCK_VERSION_1) ? htonl(colLength[i]) : colLength[i]; + int32_t colLen1 = (blockVersion == BLOCK_VERSION_1) ? htonl(colLength1[i]) : colLength1[i]; if (ASSERT(colLen < dataLen)) { tscError("doConvertJson error: colLen:%d >= dataLen:%d", colLen, dataLen); return TSDB_CODE_TSC_INTERNAL_ERROR; @@ -2009,7 +2009,7 @@ static int32_t doConvertJson(SReqResultInfo* pResultInfo, int32_t numOfCols, int } colLen1 = len; totalLen += colLen1; - colLength1[i] = (blockVersion == 1) ? htonl(len) : len; + colLength1[i] = (blockVersion == BLOCK_VERSION_1) ? htonl(len) : len; } else if (IS_VAR_DATA_TYPE(pResultInfo->fields[i].type)) { len = numOfRows * sizeof(int32_t); memcpy(pStart1, pStart, len); @@ -2098,7 +2098,7 @@ int32_t setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32 char* pStart = p; for (int32_t i = 0; i < numOfCols; ++i) { - if(blockVersion == 1){ + if(blockVersion == BLOCK_VERSION_1){ colLength[i] = htonl(colLength[i]); } if (colLength[i] >= dataLen) { diff --git a/source/client/src/clientRawBlockWrite.c b/source/client/src/clientRawBlockWrite.c index 448243cc3d..f143624bab 100644 --- a/source/client/src/clientRawBlockWrite.c +++ b/source/client/src/clientRawBlockWrite.c @@ -1001,6 +1001,7 @@ static int32_t taosCreateTable(TAOS* taos, void* meta, int32_t metaLen) { tBatch.req.pArray = taosArrayInit(4, sizeof(struct SVCreateTbReq)); taosArrayPush(tBatch.req.pArray, pCreateReq); + tBatch.req.source = TD_REQ_FROM_TAOX; taosHashPut(pVgroupHashmap, &pInfo.vgId, sizeof(pInfo.vgId), &tBatch, sizeof(tBatch)); } else { // add to the correct vgroup @@ -1276,7 +1277,7 @@ static int32_t taosAlterTable(TAOS* taos, void* meta, int32_t metaLen) { return terrno; } SVAlterTbReq req = {0}; - SDecoder coder = {0}; + SDecoder dcoder = {0}; int32_t code = TSDB_CODE_SUCCESS; SRequestObj* pRequest = NULL; SQuery* pQuery = NULL; @@ -1297,8 +1298,8 @@ static int32_t taosAlterTable(TAOS* taos, void* meta, int32_t metaLen) { // decode and process req void* data = POINTER_SHIFT(meta, sizeof(SMsgHead)); int32_t len = metaLen - sizeof(SMsgHead); - tDecoderInit(&coder, data, len); - if (tDecodeSVAlterTbReq(&coder, &req) < 0) { + tDecoderInit(&dcoder, data, len); + if (tDecodeSVAlterTbReq(&dcoder, &req) < 0) { code = TSDB_CODE_INVALID_PARA; goto end; } @@ -1340,14 +1341,36 @@ static int32_t taosAlterTable(TAOS* taos, void* meta, int32_t metaLen) { goto end; } pVgData->vg = pInfo; - pVgData->pData = taosMemoryMalloc(metaLen); - if (NULL == pVgData->pData) { + + int tlen = 0; + req.source = TD_REQ_FROM_TAOX; + tEncodeSize(tEncodeSVAlterTbReq, &req, tlen, code); + if(code != 0){ code = TSDB_CODE_OUT_OF_MEMORY; goto end; } - memcpy(pVgData->pData, meta, metaLen); - ((SMsgHead*)pVgData->pData)->vgId = htonl(pInfo.vgId); - pVgData->size = metaLen; + tlen += sizeof(SMsgHead); + void* pMsg = taosMemoryMalloc(tlen); + if (NULL == pMsg) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto end; + } + ((SMsgHead*)pMsg)->vgId = htonl(pInfo.vgId); + ((SMsgHead*)pMsg)->contLen = htonl(tlen); + void* pBuf = POINTER_SHIFT(pMsg, sizeof(SMsgHead)); + SEncoder coder = {0}; + tEncoderInit(&coder, pBuf, tlen - sizeof(SMsgHead)); + code = tEncodeSVAlterTbReq(&coder, &req); + if(code != 0){ + tEncoderClear(&coder); + code = TSDB_CODE_OUT_OF_MEMORY; + goto end; + } + tEncoderClear(&coder); + + pVgData->pData = pMsg; + pVgData->size = tlen; + pVgData->numOfTables = 1; taosArrayPush(pArray, &pVgData); @@ -1387,7 +1410,7 @@ end: if (pVgData) taosMemoryFreeClear(pVgData->pData); taosMemoryFreeClear(pVgData); destroyRequest(pRequest); - tDecoderClear(&coder); + tDecoderClear(&dcoder); qDestroyQuery(pQuery); terrno = code; return code; diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 1904874a0b..9b74456da2 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -389,7 +389,7 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value } } if (strcasecmp(key, "msg.consume.excluded") == 0) { - conf->sourceExcluded = taosStr2int64(value); + conf->sourceExcluded = (taosStr2int64(value) != 0) ? TD_REQ_FROM_TAOX : 0; return TMQ_CONF_OK; } @@ -1611,17 +1611,39 @@ SMqMetaRspObj* tmqBuildMetaRspFromWrapper(SMqPollRspWrapper* pWrapper) { return pRspObj; } -SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper, SMqClientVg* pVg, int64_t* numOfRows) { - SMqRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqRspObj)); - pRspObj->resType = RES_TYPE__TMQ; +void changeByteEndian(char* pData){ + char* p = pData; + + // | version | total length | total rows | total columns | flag seg| block group id | column schema | each column length | + // version: + int32_t blockVersion = *(int32_t*)p; + ASSERT(blockVersion == BLOCK_VERSION_1); + *(int32_t*)p = BLOCK_VERSION_2; + + p += sizeof(int32_t); + p += sizeof(int32_t); + p += sizeof(int32_t); + int32_t cols = *(int32_t*)p; + p += sizeof(int32_t); + p += sizeof(int32_t); + p += sizeof(uint64_t); + // check fields + p += cols * (sizeof(int8_t) + sizeof(int32_t)); + + int32_t* colLength = (int32_t*)p; + for (int32_t i = 0; i < cols; ++i) { + colLength[i] = htonl(colLength[i]); + } +} + +static void tmqBuildRspFromWrapperInner(SMqPollRspWrapper* pWrapper, SMqClientVg* pVg, int64_t* numOfRows, SMqRspObj* pRspObj) { (*numOfRows) = 0; tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN); tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN); pRspObj->vgId = pWrapper->vgHandle->vgId; pRspObj->resIter = -1; - memcpy(&pRspObj->rsp, &pWrapper->dataRsp, sizeof(SMqDataRsp)); pRspObj->resInfo.totalRows = 0; pRspObj->resInfo.precision = TSDB_TIME_PRECISION_MILLI; @@ -1633,41 +1655,44 @@ SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper, SMqClientVg* pVg, } // extract the rows in this data packet for (int32_t i = 0; i < pRspObj->rsp.blockNum; ++i) { - SRetrieveTableRspForTmq* pRetrieve = (SRetrieveTableRspForTmq*)taosArrayGetP(pRspObj->rsp.blockData, i); - int64_t rows = htobe64(pRetrieve->numOfRows); + void* pRetrieve = taosArrayGetP(pRspObj->rsp.blockData, i); + void* rawData = NULL; + int64_t rows = 0; + // deal with compatibility + if(*(int64_t*)pRetrieve == 0){ + rawData = ((SRetrieveTableRsp*)pRetrieve)->data; + rows = htobe64(((SRetrieveTableRsp*)pRetrieve)->numOfRows); + }else if(*(int64_t*)pRetrieve == 1){ + rawData = ((SRetrieveTableRspForTmq*)pRetrieve)->data; + rows = htobe64(((SRetrieveTableRspForTmq*)pRetrieve)->numOfRows); + } + pVg->numOfRows += rows; (*numOfRows) += rows; - - if (needTransformSchema) { // withSchema is false if subscribe subquery, true if subscribe db or stable - SSchemaWrapper* schema = tCloneSSchemaWrapper(&pWrapper->topicHandle->schema); - if (schema) { + changeByteEndian(rawData); + if (needTransformSchema) { //withSchema is false if subscribe subquery, true if subscribe db or stable + SSchemaWrapper *schema = tCloneSSchemaWrapper(&pWrapper->topicHandle->schema); + if(schema){ taosArrayPush(pRspObj->rsp.blockSchema, &schema); } } } +} +SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper, SMqClientVg* pVg, int64_t* numOfRows) { + SMqRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqRspObj)); + pRspObj->resType = RES_TYPE__TMQ; + memcpy(&pRspObj->rsp, &pWrapper->dataRsp, sizeof(SMqDataRsp)); + tmqBuildRspFromWrapperInner(pWrapper, pVg, numOfRows, pRspObj); return pRspObj; } SMqTaosxRspObj* tmqBuildTaosxRspFromWrapper(SMqPollRspWrapper* pWrapper, SMqClientVg* pVg, int64_t* numOfRows) { SMqTaosxRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqTaosxRspObj)); pRspObj->resType = RES_TYPE__TMQ_METADATA; - tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN); - tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN); - pRspObj->vgId = pWrapper->vgHandle->vgId; - pRspObj->resIter = -1; memcpy(&pRspObj->rsp, &pWrapper->taosxRsp, sizeof(STaosxRsp)); - pRspObj->resInfo.totalRows = 0; - pRspObj->resInfo.precision = TSDB_TIME_PRECISION_MILLI; - - // extract the rows in this data packet - for (int32_t i = 0; i < pRspObj->rsp.blockNum; ++i) { - SRetrieveTableRspForTmq* pRetrieve = (SRetrieveTableRspForTmq*)taosArrayGetP(pRspObj->rsp.blockData, i); - int64_t rows = htobe64(pRetrieve->numOfRows); - pVg->numOfRows += rows; - (*numOfRows) += rows; - } + tmqBuildRspFromWrapperInner(pWrapper, pVg, numOfRows, (SMqRspObj*)pRspObj); return pRspObj; } diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 9439c172c4..9a39812bc9 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -2196,7 +2196,7 @@ int32_t blockEncode(const SSDataBlock* pBlock, char* data, int32_t numOfCols) { // todo extract method int32_t* version = (int32_t*)data; - *version = 2; + *version = BLOCK_VERSION_1; data += sizeof(int32_t); int32_t* actualLen = (int32_t*)data; @@ -2277,7 +2277,7 @@ int32_t blockEncode(const SSDataBlock* pBlock, char* data, int32_t numOfCols) { data += colSizes[col]; } -// colSizes[col] = htonl(colSizes[col]); + colSizes[col] = htonl(colSizes[col]); // uError("blockEncode col bytes:%d, type:%d, size:%d, htonl size:%d", pColRes->info.bytes, pColRes->info.type, // htonl(colSizes[col]), colSizes[col]); } @@ -2342,9 +2342,7 @@ const char* blockDecode(SSDataBlock* pBlock, const char* pData) { pStart += sizeof(int32_t) * numOfCols; for (int32_t i = 0; i < numOfCols; ++i) { - if(version == 1){ - colLen[i] = htonl(colLen[i]); - } + colLen[i] = htonl(colLen[i]); ASSERT(colLen[i] >= 0); SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i); diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 2ffa12f2c1..85f5d462c7 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -7513,6 +7513,7 @@ int tEncodeSVCreateStbReq(SEncoder *pCoder, const SVCreateStbReq *pReq) { if (pReq->alterOriDataLen > 0) { if (tEncodeBinary(pCoder, pReq->alterOriData, pReq->alterOriDataLen) < 0) return -1; } + if (tEncodeI8(pCoder, pReq->source) < 0) return -1; tEndEncode(pCoder); return 0; @@ -7535,6 +7536,10 @@ int tDecodeSVCreateStbReq(SDecoder *pCoder, SVCreateStbReq *pReq) { if (tDecodeBinary(pCoder, (uint8_t **)&pReq->alterOriData, NULL) < 0) return -1; } + if (!tDecodeIsEnd(pCoder)) { + if (tDecodeI8(pCoder, &pReq->source) < 0) return -1; + } + tEndDecode(pCoder); return 0; } @@ -7663,6 +7668,8 @@ int tEncodeSVCreateTbBatchReq(SEncoder *pCoder, const SVCreateTbBatchReq *pReq) if (tEncodeSVCreateTbReq(pCoder, (SVCreateTbReq *)taosArrayGet(pReq->pArray, iReq)) < 0) return -1; } + if (tEncodeI8(pCoder, pReq->source) < 0) return -1; + tEndEncode(pCoder); return 0; } @@ -7677,6 +7684,10 @@ int tDecodeSVCreateTbBatchReq(SDecoder *pCoder, SVCreateTbBatchReq *pReq) { if (tDecodeSVCreateTbReq(pCoder, pReq->pReqs + iReq) < 0) return -1; } + if (!tDecodeIsEnd(pCoder)) { + if (tDecodeI8(pCoder, &pReq->source) < 0) return -1; + } + tEndDecode(pCoder); return 0; } @@ -8034,6 +8045,7 @@ int32_t tEncodeSVAlterTbReq(SEncoder *pEncoder, const SVAlterTbReq *pReq) { break; } if (tEncodeI64(pEncoder, pReq->ctimeMs) < 0) return -1; + if (tEncodeI8(pEncoder, pReq->source) < 0) return -1; tEndEncode(pEncoder); return 0; @@ -8094,6 +8106,9 @@ int32_t tDecodeSVAlterTbReq(SDecoder *pDecoder, SVAlterTbReq *pReq) { if (!tDecodeIsEnd(pDecoder)) { if (tDecodeI64(pDecoder, &pReq->ctimeMs) < 0) return -1; } + if (!tDecodeIsEnd(pDecoder)) { + if (tDecodeI8(pDecoder, &pReq->source) < 0) return -1; + } tEndDecode(pDecoder); return 0; @@ -8670,7 +8685,6 @@ static int32_t tEncodeSSubmitTbData(SEncoder *pCoder, const SSubmitTbData *pSubm } } if (tEncodeI64(pCoder, pSubmitTbData->ctimeMs) < 0) return -1; - if (tEncodeI8(pCoder, pSubmitTbData->source) < 0) return -1; tEndEncode(pCoder); return 0; @@ -8758,12 +8772,6 @@ static int32_t tDecodeSSubmitTbData(SDecoder *pCoder, SSubmitTbData *pSubmitTbDa goto _exit; } } - if (!tDecodeIsEnd(pCoder)) { - if (tDecodeI8(pCoder, &pSubmitTbData->source) < 0) { - code = TSDB_CODE_INVALID_MSG; - goto _exit; - } - } tEndDecode(pCoder); diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index fc58fabad5..4a092057ce 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -464,6 +464,7 @@ typedef struct { char* pAst1; char* pAst2; SRWLatch lock; + int8_t source; } SStbObj; typedef struct { diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index 1ddd2f34e6..aad1dc881b 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -458,6 +458,7 @@ void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb, int3 req.rollup = pStb->ast1Len > 0 ? 1 : 0; req.alterOriData = alterOriData; req.alterOriDataLen = alterOriDataLen; + req.source = pStb->source; // todo req.schemaRow.nCols = pStb->numOfColumns; req.schemaRow.version = pStb->colVer; @@ -774,7 +775,8 @@ int32_t mndBuildStbFromReq(SMnode *pMnode, SStbObj *pDst, SMCreateStbReq *pCreat pDst->createdTime = taosGetTimestampMs(); pDst->updateTime = pDst->createdTime; pDst->uid = - (pCreate->source == TD_REQ_FROM_TAOX) ? pCreate->suid : mndGenerateUid(pCreate->name, TSDB_TABLE_FNAME_LEN); + (pCreate->source == TD_REQ_FROM_TAOX_OLD || pCreate->source == TD_REQ_FROM_TAOX) + ? pCreate->suid : mndGenerateUid(pCreate->name, TSDB_TABLE_FNAME_LEN); pDst->dbUid = pDb->uid; pDst->tagVer = 1; pDst->colVer = 1; @@ -790,6 +792,7 @@ int32_t mndBuildStbFromReq(SMnode *pMnode, SStbObj *pDst, SMCreateStbReq *pCreat pDst->numOfFuncs = pCreate->numOfFuncs; pDst->commentLen = pCreate->commentLen; pDst->pFuncs = pCreate->pFuncs; + pDst->source = pCreate->source; pCreate->pFuncs = NULL; if (pDst->commentLen > 0) { @@ -1033,6 +1036,7 @@ static int32_t mndBuildStbFromAlter(SStbObj *pStb, SStbObj *pDst, SMCreateStbReq memcpy(pDst, pStb, sizeof(SStbObj)); taosRUnLockLatch(&pStb->lock); + pDst->source = createReq->source; pDst->updateTime = taosGetTimestampMs(); pDst->numOfColumns = createReq->numOfColumns; pDst->numOfTags = createReq->numOfTags; @@ -1141,7 +1145,7 @@ static int32_t mndProcessCreateStbReq(SRpcMsg *pReq) { } } else if (terrno != TSDB_CODE_MND_STB_NOT_EXIST) { goto _OVER; - } else if (createReq.source == TD_REQ_FROM_TAOX && (createReq.tagVer != 1 || createReq.colVer != 1)) { + } else if ((createReq.source == TD_REQ_FROM_TAOX_OLD || createReq.source == TD_REQ_FROM_TAOX) && (createReq.tagVer != 1 || createReq.colVer != 1)) { mInfo("stb:%s, alter table does not need to be done, because table is deleted", createReq.name); code = 0; goto _OVER; @@ -2572,7 +2576,7 @@ static int32_t mndProcessDropStbReq(SRpcMsg *pReq) { } } - if (dropReq.source == TD_REQ_FROM_TAOX && pStb->uid != dropReq.suid) { + if ((dropReq.source == TD_REQ_FROM_TAOX_OLD || dropReq.source == TD_REQ_FROM_TAOX) && pStb->uid != dropReq.suid) { code = 0; goto _OVER; } diff --git a/source/dnode/mnode/impl/src/mndUser.c b/source/dnode/mnode/impl/src/mndUser.c index 5e5a3626a4..8a2313370a 100644 --- a/source/dnode/mnode/impl/src/mndUser.c +++ b/source/dnode/mnode/impl/src/mndUser.c @@ -2484,7 +2484,7 @@ static int32_t mndRetrievePrivileges(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataSetVal(pColInfo, numOfRows, (const char *)privilege, false); - char objName[20] = {0}; + char objName[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0}; STR_WITH_MAXSIZE_TO_VARSTR(objName, "all", pShow->pMeta->pSchemas[cols].bytes); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataSetVal(pColInfo, numOfRows, (const char *)objName, false); diff --git a/source/dnode/vnode/src/sma/smaTimeRange.c b/source/dnode/vnode/src/sma/smaTimeRange.c index 54d859992c..fb898c02f8 100644 --- a/source/dnode/vnode/src/sma/smaTimeRange.c +++ b/source/dnode/vnode/src/sma/smaTimeRange.c @@ -193,7 +193,7 @@ int32_t smaBlockToSubmit(SVnode *pVnode, const SArray *pBlocks, const STSchema * continue; } - SSubmitTbData tbData = {.suid = suid, .uid = 0, .sver = pTSchema->version, .flags = SUBMIT_REQ_AUTO_CREATE_TABLE, .source = SOURCE_NULL}; + SSubmitTbData tbData = {.suid = suid, .uid = 0, .sver = pTSchema->version, .flags = SUBMIT_REQ_AUTO_CREATE_TABLE}; int32_t cid = taosArrayGetSize(pDataBlock->pDataBlock) + 1; tbData.pCreateTbReq = buildAutoCreateTableReq(stbFullName, suid, cid, pDataBlock, tagArray, true); diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 727157a2f8..7a737fbb5d 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -392,7 +392,7 @@ bool tqNextBlockInWal(STqReader* pReader, const char* id, int sourceExcluded) { pReader->msg.ver); SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk); - if ((pSubmitTbData->source & sourceExcluded) != 0) { + if ((pSubmitTbData->flags & sourceExcluded) != 0) { pReader->nextBlk += 1; continue; } diff --git a/source/dnode/vnode/src/tq/tqScan.c b/source/dnode/vnode/src/tq/tqScan.c index f108cedcf5..103007eb57 100644 --- a/source/dnode/vnode/src/tq/tqScan.c +++ b/source/dnode/vnode/src/tq/tqScan.c @@ -267,7 +267,7 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) goto loop_table; } - if ((pSubmitTbDataRet->source & sourceExcluded) != 0) { + if ((pSubmitTbDataRet->flags & sourceExcluded) != 0) { goto loop_table; } if (pRsp->withTbName) { @@ -335,7 +335,7 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) goto loop_db; } - if ((pSubmitTbDataRet->source & sourceExcluded) != 0) { + if ((pSubmitTbDataRet->flags & sourceExcluded) != 0) { goto loop_db; } if (pRsp->withTbName) { diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index b56bf3e0fe..42bd1bc59c 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -815,7 +815,7 @@ void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) { return; } - SSubmitTbData tbData = {.suid = suid, .uid = 0, .sver = pTSchema->version, .source = SOURCE_NULL}; + SSubmitTbData tbData = {.suid = suid, .uid = 0, .sver = pTSchema->version, .flags = TD_REQ_FROM_APP}; code = setDstTableDataUid(pVnode, pTask, pDataBlock, stbFullName, &tbData); if (code != TSDB_CODE_SUCCESS) { continue; @@ -860,7 +860,7 @@ void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) { pTask->execInfo.sink.numOfBlocks += 1; uint64_t groupId = pDataBlock->info.id.groupId; - SSubmitTbData tbData = {.suid = suid, .uid = 0, .sver = pTSchema->version, .source = SOURCE_NULL}; + SSubmitTbData tbData = {.suid = suid, .uid = 0, .sver = pTSchema->version, .flags = TD_REQ_FROM_APP}; int32_t* index = taosHashGet(pTableIndexMap, &groupId, sizeof(groupId)); if (index == NULL) { // no data yet, append it diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index 31267dbf52..dad1211326 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -171,6 +171,22 @@ end : { } } +#define PROCESS_EXCLUDED_MSG(TYPE, DECODE_FUNC) \ + SDecoder decoder = {0};\ + TYPE req = {0}; \ + void* data = POINTER_SHIFT(pHead->body, sizeof(SMsgHead)); \ + int32_t len = pHead->bodyLen - sizeof(SMsgHead); \ + tDecoderInit(&decoder, data, len); \ + if (DECODE_FUNC(&decoder, &req) == 0 && (req.source & TD_REQ_FROM_TAOX) != 0) { \ + tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d) iter log, jump meta for, vgId:%d offset %" PRId64 " msgType %d", \ + pRequest->consumerId, pRequest->epoch, vgId, fetchVer, pHead->msgType); \ + fetchVer++; \ + tDecoderClear(&decoder); \ + continue; \ + } \ + tDecoderClear(&decoder); + + static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg, STqOffsetVal* offset) { int code = 0; @@ -239,6 +255,19 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, goto end; } + if ((pRequest->sourceExcluded & TD_REQ_FROM_TAOX) != 0) { + if (pHead->msgType == TDMT_VND_CREATE_TABLE) { + PROCESS_EXCLUDED_MSG(SVCreateTbBatchReq, tDecodeSVCreateTbBatchReq) + } else if (pHead->msgType == TDMT_VND_ALTER_TABLE) { + PROCESS_EXCLUDED_MSG(SVAlterTbReq, tDecodeSVAlterTbReq) + } else if (pHead->msgType == TDMT_VND_CREATE_STB || pHead->msgType == TDMT_VND_ALTER_STB) { + PROCESS_EXCLUDED_MSG(SVCreateStbReq, tDecodeSVCreateStbReq) + } else if (pHead->msgType == TDMT_VND_DELETE) { + fetchVer++; + continue; + } + } + tqDebug("fetch meta msg, ver:%" PRId64 ", type:%s", pHead->version, TMSG_INFO(pHead->msgType)); tqOffsetResetToLog(&metaRsp.rspOffset, fetchVer + 1); metaRsp.resMsgType = pHead->msgType; diff --git a/source/libs/executor/src/streamcountwindowoperator.c b/source/libs/executor/src/streamcountwindowoperator.c index 689ba54ca6..294c2730df 100644 --- a/source/libs/executor/src/streamcountwindowoperator.c +++ b/source/libs/executor/src/streamcountwindowoperator.c @@ -99,6 +99,9 @@ void setCountOutputBuf(SStreamAggSupporter* pAggSup, TSKEY ts, uint64_t groupId, (void**)&pCurWin->winInfo.pStatePos, &size); } } + if (ts < pCurWin->winInfo.sessionWin.win.ekey) { + pBuffInfo->rebuildWindow = true; + } } else { code = pAggSup->stateStore.streamStateCountWinAddIfNotExist( pAggSup->pState, &pCurWin->winInfo.sessionWin, pAggSup->windowCount, (void**)&pCurWin->winInfo.pStatePos, &size); diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index 8ed255313e..fbde32a701 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -1543,10 +1543,13 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) { if (pBlk != NULL) { SColumnInfoData* tsCol = taosArrayGet(pBlk->pDataBlock, pOrigOrder->slotId); int64_t firstRowTs = *(int64_t*)tsCol->pData; - if ((pOrigOrder->order == TSDB_ORDER_ASC && firstRowTs > pHandle->currMergeLimitTs) || - (pOrigOrder->order == TSDB_ORDER_DESC && firstRowTs < pHandle->currMergeLimitTs)) { - continue; - } + if ((pOrder->order == TSDB_ORDER_ASC && firstRowTs > pHandle->currMergeLimitTs) || + (pOrder->order == TSDB_ORDER_DESC && firstRowTs < pHandle->currMergeLimitTs)) { + if (bExtractedBlock) { + blockDataDestroy(pBlk); + } + continue; + } } if (pBlk != NULL) { @@ -1572,10 +1575,11 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) { code = sortBlocksToExtSource(pHandle, aBlkSort, aExtSrc); if (code != TSDB_CODE_SUCCESS) { - tSimpleHashCleanup(mUidBlk); - taosArrayDestroy(aBlkSort); - taosArrayDestroy(aExtSrc); - return code; + for (int i = 0; i < taosArrayGetSize(aBlkSort); ++i) { + blockDataDestroy(taosArrayGetP(aBlkSort, i)); + } + taosArrayClear(aBlkSort); + break; } int64_t el = taosGetTimestampUs() - p; @@ -1588,6 +1592,7 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) { szSort = 0; qDebug("source %zu created", taosArrayGetSize(aExtSrc)); } + if (pBlk == NULL) { break; } @@ -1603,6 +1608,9 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) { } tSimpleHashCleanup(mUidBlk); + for (int i = 0; i < taosArrayGetSize(aBlkSort); ++i) { + blockDataDestroy(taosArrayGetP(aBlkSort, i)); + } taosArrayDestroy(aBlkSort); tsortClearOrderdSource(pHandle->pOrderedSource, NULL, NULL); if (!tsortIsClosed(pHandle)) { @@ -1614,7 +1622,7 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) { freeExtRowMemFileWriteBuf(pHandle); } pHandle->type = SORT_SINGLESOURCE_SORT; - return TSDB_CODE_SUCCESS; + return code; } static int32_t createBlocksQuickSortInitialSources(SSortHandle* pHandle) { diff --git a/source/libs/index/test/CMakeLists.txt b/source/libs/index/test/CMakeLists.txt index 1ee835573b..eff74f93d2 100644 --- a/source/libs/index/test/CMakeLists.txt +++ b/source/libs/index/test/CMakeLists.txt @@ -175,4 +175,8 @@ IF(NOT TD_DARWIN) NAME idxFstUT COMMAND idxFstUT ) + add_test( + NAME idxFstTest + COMMAND idxFstTest + ) ENDIF () diff --git a/source/libs/index/test/fstTest.cc b/source/libs/index/test/fstTest.cc index b889a2209a..adf783d2eb 100644 --- a/source/libs/index/test/fstTest.cc +++ b/source/libs/index/test/fstTest.cc @@ -1,4 +1,5 @@ +#include #include #include #include @@ -14,6 +15,12 @@ #include "tutil.h" void* callback(void* s) { return s; } +class FstEnv : public ::testing::Test { + protected: + virtual void SetUp() {} + virtual void TearDown() {} +}; + static std::string fileName = TD_TMP_DIR_PATH "tindex.tindex"; class FstWriter { public: @@ -154,7 +161,7 @@ class FstReadMemory { int32_t _size; }; -#define L 100 +#define L 200 #define M 100 #define N 100 @@ -200,7 +207,7 @@ void checkMillonWriteAndReadOfFst() { FstWriter* fw = new FstWriter; Performance_fstWriteRecords(fw); delete fw; - FstReadMemory* fr = new FstReadMemory(1024 * 64 * 1024); + FstReadMemory* fr = new FstReadMemory(1024 * 8 * 1024); if (fr->init()) { printf("success to init fst read"); @@ -637,23 +644,31 @@ void iterTFileReader(char* path, char* uid, char* colName, char* ver) { tfileIteratorDestroy(iter); } -int main(int argc, char* argv[]) { - // tool to check all kind of fst test - // if (argc > 1) { validateTFile(argv[1]); } - // if (argc > 4) { - // path suid colName ver - // iterTFileReader(argv[1], argv[2], argv[3], argv[4]); - //} - checkFstCheckIterator1(); - // checkFstCheckIterator2(); - // checkFstCheckIteratorPrefix(); - // checkFstCheckIteratorRange1(); - // checkFstCheckIteratorRange2(); - // checkFstCheckIteratorRange3(); - // checkFstLongTerm(); - // checkFstPrefixSearch(); +// int main(int argc, char* argv[]) { +// // tool to check all kind of fst test +// // if (argc > 1) { validateTFile(argv[1]); } +// // if (argc > 4) { +// // path suid colName ver +// // iterTFileReader(argv[1], argv[2], argv[3], argv[4]); +// //} +// checkFstCheckIterator1(); +// // checkFstCheckIterator2(); +// // checkFstCheckIteratorPrefix(); +// // checkFstCheckIteratorRange1(); +// // checkFstCheckIteratorRange2(); +// // checkFstCheckIteratorRange3(); +// // checkFstLongTerm(); +// // checkFstPrefixSearch(); - // checkMillonWriteAndReadOfFst(); +// // checkMillonWriteAndReadOfFst(); - return 1; -} +// return 1; +// } +TEST_F(FstEnv, checkIterator1) { checkFstCheckIterator1(); } +TEST_F(FstEnv, checkItertor2) { checkFstCheckIterator2(); } +TEST_F(FstEnv, checkPrefix) { checkFstCheckIteratorPrefix(); } +TEST_F(FstEnv, checkRange1) { checkFstCheckIteratorRange1(); } +TEST_F(FstEnv, checkRange2) { checkFstCheckIteratorRange2(); } +TEST_F(FstEnv, checkRange3) { checkFstCheckIteratorRange3(); } +TEST_F(FstEnv, checkLongTerm) { checkFstLongTerm(); } +TEST_F(FstEnv, checkMillonWriteData) { checkMillonWriteAndReadOfFst(); } diff --git a/source/libs/parser/src/parInsertUtil.c b/source/libs/parser/src/parInsertUtil.c index 8900ef8e7f..808ab71b92 100644 --- a/source/libs/parser/src/parInsertUtil.c +++ b/source/libs/parser/src/parInsertUtil.c @@ -285,7 +285,6 @@ static int32_t rebuildTableData(SSubmitTbData* pSrc, SSubmitTbData** pDst) { pTmp->suid = pSrc->suid; pTmp->uid = pSrc->uid; pTmp->sver = pSrc->sver; - pTmp->source = pSrc->source; pTmp->pCreateTbReq = NULL; if (pTmp->flags & SUBMIT_REQ_AUTO_CREATE_TABLE) { if (pSrc->pCreateTbReq) { @@ -653,7 +652,7 @@ int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreate goto end; } - pTableCxt->pData->source = SOURCE_TAOSX; + pTableCxt->pData->flags |= TD_REQ_FROM_TAOX; if(tmp == NULL){ ret = initTableColSubmitData(pTableCxt); if (ret != TSDB_CODE_SUCCESS) { @@ -721,7 +720,7 @@ int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreate goto end; } fields += sizeof(int8_t) + sizeof(int32_t); - if (needChangeLength && version == 1) { + if (needChangeLength && version == BLOCK_VERSION_1) { pStart += htonl(colLength[j]); } else { pStart += colLength[j]; @@ -752,7 +751,7 @@ int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreate goto end; } fields += sizeof(int8_t) + sizeof(int32_t); - if (needChangeLength && version == 1) { + if (needChangeLength && version == BLOCK_VERSION_1) { pStart += htonl(colLength[i]); } else { pStart += colLength[i]; diff --git a/source/libs/stream/src/streamSessionState.c b/source/libs/stream/src/streamSessionState.c index e2aae130e5..3d0241df75 100644 --- a/source/libs/stream/src/streamSessionState.c +++ b/source/libs/stream/src/streamSessionState.c @@ -488,6 +488,7 @@ SStreamStateCur* countWinStateSeekKeyPrev(SStreamFileState* pFileState, const SS void* pFileStore = getStateFileStore(pFileState); SStreamStateCur* pCur = streamStateSessionSeekKeyPrev_rocksdb(pFileStore, pWinKey); if (pCur) { + pCur->pStreamFileState = pFileState; SSessionKey key = {0}; void* pVal = NULL; int len = 0; diff --git a/source/util/src/tskiplist.c b/source/util/src/tskiplist.c index eadd9a2413..f90509bec1 100644 --- a/source/util/src/tskiplist.c +++ b/source/util/src/tskiplist.c @@ -21,7 +21,7 @@ #include "tutil.h" static int32_t initForwardBackwardPtr(SSkipList *pSkipList); -static SSkipListNode *getPriorNode(SSkipList *pSkipList, const char *val, int32_t order, SSkipListNode **pCur); +static SSkipListNode * getPriorNode(SSkipList *pSkipList, const char *val, int32_t order, SSkipListNode **pCur); static void tSkipListRemoveNodeImpl(SSkipList *pSkipList, SSkipListNode *pNode); static void tSkipListCorrectLevel(SSkipList *pSkipList); static SSkipListIterator *doCreateSkipListIterator(SSkipList *pSkipList, int32_t order); @@ -131,12 +131,14 @@ SSkipListNode *tSkipListPut(SSkipList *pSkipList, void *pData) { return pNode; } +#ifdef BUILD_NO_CALL + void tSkipListPutBatchByIter(SSkipList *pSkipList, void *iter, iter_next_fn_t iterate) { SSkipListNode *backward[MAX_SKIP_LIST_LEVEL] = {0}; SSkipListNode *forward[MAX_SKIP_LIST_LEVEL] = {0}; bool hasDup = false; - char *pKey = NULL; - char *pDataKey = NULL; + char * pKey = NULL; + char * pDataKey = NULL; int32_t compare = 0; tSkipListWLock(pSkipList); @@ -260,6 +262,7 @@ void tSkipListRemoveNode(SSkipList *pSkipList, SSkipListNode *pNode) { tSkipListCorrectLevel(pSkipList); tSkipListUnlock(pSkipList); } +#endif SSkipListIterator *tSkipListCreateIter(SSkipList *pSkipList) { if (pSkipList == NULL) return NULL; @@ -350,6 +353,7 @@ void *tSkipListDestroyIter(SSkipListIterator *iter) { return NULL; } +#ifdef BUILD_NO_CALL void tSkipListPrint(SSkipList *pSkipList, int16_t nlevel) { if (pSkipList == NULL || pSkipList->level < nlevel || nlevel <= 0) { return; @@ -358,7 +362,7 @@ void tSkipListPrint(SSkipList *pSkipList, int16_t nlevel) { SSkipListNode *p = SL_NODE_GET_FORWARD_POINTER(pSkipList->pHead, nlevel - 1); int32_t id = 1; - char *prev = NULL; + char * prev = NULL; while (p != pSkipList->pTail) { char *key = SL_GET_NODE_KEY(pSkipList, p); @@ -392,6 +396,7 @@ void tSkipListPrint(SSkipList *pSkipList, int16_t nlevel) { p = SL_NODE_GET_FORWARD_POINTER(p, nlevel - 1); } } +#endif static void tSkipListDoInsert(SSkipList *pSkipList, SSkipListNode **direction, SSkipListNode *pNode, bool isForward) { for (int32_t i = 0; i < pNode->level; ++i) { @@ -460,7 +465,7 @@ static FORCE_INLINE int32_t tSkipListUnlock(SSkipList *pSkipList) { static bool tSkipListGetPosToPut(SSkipList *pSkipList, SSkipListNode **backward, void *pData) { int32_t compare = 0; bool hasDupKey = false; - char *pDataKey = pSkipList->keyFn(pData); + char * pDataKey = pSkipList->keyFn(pData); if (pSkipList->size == 0) { for (int32_t i = 0; i < pSkipList->maxLevel; i++) { @@ -516,6 +521,7 @@ static bool tSkipListGetPosToPut(SSkipList *pSkipList, SSkipListNode **backward, return hasDupKey; } +#ifdef BUILD_NO_CALL static void tSkipListRemoveNodeImpl(SSkipList *pSkipList, SSkipListNode *pNode) { int32_t level = pNode->level; uint8_t dupMode = SL_DUP_MODE(pSkipList); @@ -540,6 +546,7 @@ static void tSkipListCorrectLevel(SSkipList *pSkipList) { pSkipList->level -= 1; } } +#endif UNUSED_FUNC static FORCE_INLINE void recordNodeEachLevel(SSkipList *pSkipList, int32_t level) { // record link count in each level diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index cc15df33a1..12f1f62f63 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -49,6 +49,7 @@ ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/tbname_vgroup.py ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/count_interval.py ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/compact-col.py +,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/tms_memleak.py ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/stbJoin.py ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/stbJoin.py -Q 2 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/stbJoin.py -Q 3 diff --git a/tests/system-test/2-query/tms_memleak.py b/tests/system-test/2-query/tms_memleak.py new file mode 100644 index 0000000000..0d5cdd8272 --- /dev/null +++ b/tests/system-test/2-query/tms_memleak.py @@ -0,0 +1,51 @@ +import sys +from util.log import * +from util.cases import * +from util.sql import * +from util.dnodes import tdDnodes +from math import inf + +class TDTestCase: + def caseDescription(self): + ''' + case1: [TD-] + ''' + return + + def init(self, conn, logSql, replicaVer=1): + tdLog.debug("start to execute %s" % __file__) + tdSql.init(conn.cursor(), True) + self.conn = conn + + def restartTaosd(self, index=1, dbname="db"): + tdDnodes.stop(index) + tdDnodes.startWithoutSleep(index) + tdSql.execute(f"use tms_memleak") + + def run(self): + print("running {}".format(__file__)) + tdSql.execute("drop database if exists tms_memleak") + tdSql.execute("create database if not exists tms_memleak") + tdSql.execute('use tms_memleak') + + tdSql.execute('create table st(ts timestamp, f int) tags (t int);') + + tdSql.execute("insert into ct1 using st tags(1) values('2021-04-19 00:00:01', 1)('2021-04-19 00:00:02', 2)('2021-04-19 00:00:03', 3)('2021-04-19 00:00:04', 4)") + + tdSql.execute("insert into ct2 using st tags(2) values('2021-04-20 00:00:01', 5)('2021-04-20 00:00:02', 6)('2021-04-20 00:00:03', 7)('2021-04-20 00:00:04', 8)") + + tdSql.execute("insert into ct3 using st tags(3) values('2021-04-21 00:00:01', 5)('2021-04-21 00:00:02', 6)('2021-04-21 00:00:03', 7)('2021-04-21 00:00:04', 8)") + + tdSql.execute("insert into ct4 using st tags(4) values('2021-04-22 00:00:01', 5)('2021-04-22 00:00:02', 6)('2021-04-22 00:00:03', 7)('2021-04-22 00:00:04', 8)") + + tdSql.query("select * from st order by ts limit 1 "); + tdSql.checkRows(1) + tdSql.checkData(0, 1, 1); + + tdSql.execute('drop database tms_memleak') + def stop(self): + tdSql.close() + tdLog.success("%s successfully executed" % __file__) + +tdCases.addWindows(__file__, TDTestCase()) +tdCases.addLinux(__file__, TDTestCase()) diff --git a/tests/system-test/7-tmq/tmq_taosx.py b/tests/system-test/7-tmq/tmq_taosx.py index ac8a6f377c..c547385d70 100644 --- a/tests/system-test/7-tmq/tmq_taosx.py +++ b/tests/system-test/7-tmq/tmq_taosx.py @@ -323,7 +323,7 @@ class TDTestCase: tdSql.query("select * from st") tdSql.checkRows(8) - tdSql.execute(f'create topic topic_excluded with meta as database d1') + tdSql.execute(f'create topic topic_all with meta as database d1') consumer_dict = { "group.id": "g1", "td.connect.user": "root", @@ -333,7 +333,7 @@ class TDTestCase: consumer = Consumer(consumer_dict) try: - consumer.subscribe(["topic_excluded"]) + consumer.subscribe(["topic_all"]) except TmqError: tdLog.exit(f"subscribe error") diff --git a/utils/test/c/tmq_taosx_ci.c b/utils/test/c/tmq_taosx_ci.c index 056b7dc6cf..2257089f06 100644 --- a/utils/test/c/tmq_taosx_ci.c +++ b/utils/test/c/tmq_taosx_ci.c @@ -20,6 +20,7 @@ #include #include "taos.h" #include "types.h" +#include "tmsg.h" static int running = 1; TdFilePtr g_fp = NULL; @@ -966,7 +967,14 @@ void testConsumeExcluded(int topic_type){ tmq_raw_data raw = {0}; tmq_get_raw(msg, &raw); if(topic_type == 1){ - assert(raw.raw_type != 2 && raw.raw_type != 4); + assert(raw.raw_type != 2 && raw.raw_type != 4 && + raw.raw_type != TDMT_VND_CREATE_STB && + raw.raw_type != TDMT_VND_ALTER_STB && + raw.raw_type != TDMT_VND_CREATE_TABLE && + raw.raw_type != TDMT_VND_ALTER_TABLE && + raw.raw_type != TDMT_VND_DELETE); + assert(raw.raw_type == TDMT_VND_DROP_STB || + raw.raw_type == TDMT_VND_DROP_TABLE); }else if(topic_type == 2){ assert(0); }