diff --git a/docs/en/12-taos-sql/06-select.md b/docs/en/12-taos-sql/06-select.md index 1fc6fb7e67..a2e6bca46c 100755 --- a/docs/en/12-taos-sql/06-select.md +++ b/docs/en/12-taos-sql/06-select.md @@ -24,7 +24,7 @@ SELECT [hints] [DISTINCT] [TAGS] select_list hints: /*+ [hint([hint_param_list])] [hint([hint_param_list])] */ hint: - BATCH_SCAN | NO_BATCH_SCAN | SORT_FOR_GROUP + BATCH_SCAN | NO_BATCH_SCAN | SORT_FOR_GROUP | PARA_TABLES_SORT select_list: select_expr [, select_expr] ... @@ -87,12 +87,13 @@ Hints are a means of user control over query optimization for individual stateme The list of currently supported Hints is as follows: -| **Hint** | **Params** | **Comment** | **Scopt** | +| **Hint** | **Params** | **Comment** | **Scope** | | :-----------: | -------------- | -------------------------- | -----------------------------------| | BATCH_SCAN | None | Batch table scan | JOIN statment for stable | | NO_BATCH_SCAN | None | Sequential table scan | JOIN statment for stable | | SORT_FOR_GROUP| None | Use sort for partition, conflict with PARTITION_FIRST | With normal column in partition by list | | PARTITION_FIRST| None | Use Partition before aggregate, conflict with SORT_FOR_GROUP | With normal column in partition by list | +| PARA_TABLES_SORT| None | When sorting the supertable rows by timestamp, No temporary disk space is used. When there are numerous tables, each with long rows, the corresponding algorithm associated with this prompt may consume a substantial amount of memory, potentially leading to an Out Of Memory (OOM) situation. | Sorting the supertable rows by timestamp | For example: @@ -100,6 +101,7 @@ For example: SELECT /*+ BATCH_SCAN() */ a.ts FROM stable1 a, stable2 b where a.tag0 = b.tag0 and a.ts = b.ts; SELECT /*+ SORT_FOR_GROUP() */ count(*), c1 FROM stable1 PARTITION BY c1; SELECT /*+ PARTITION_FIRST() */ count(*), c1 FROM stable1 PARTITION BY c1; +SELECT /*+ PARA_TABLES_SORT() */ * from stable1 order by ts; ``` ## Lists diff --git a/docs/zh/12-taos-sql/06-select.md b/docs/zh/12-taos-sql/06-select.md index 2a7dff6f7d..eec947ea23 100755 --- a/docs/zh/12-taos-sql/06-select.md +++ b/docs/zh/12-taos-sql/06-select.md @@ -24,7 +24,7 @@ SELECT [hints] [DISTINCT] [TAGS] select_list hints: /*+ [hint([hint_param_list])] [hint([hint_param_list])] */ hint: - BATCH_SCAN | NO_BATCH_SCAN | SORT_FOR_GROUP + BATCH_SCAN | NO_BATCH_SCAN | SORT_FOR_GROUP | PARA_TABLES_SORT select_list: select_expr [, select_expr] ... @@ -93,13 +93,14 @@ Hints 是用户控制单个语句查询优化的一种手段,当 Hint 不适 | NO_BATCH_SCAN | 无 | 采用顺序读表的方式 | 超级表 JOIN 语句 | | SORT_FOR_GROUP| 无 | 采用sort方式进行分组, 与PARTITION_FIRST冲突 | partition by 列表有普通列时 | | PARTITION_FIRST| 无 | 在聚合之前使用PARTITION计算分组, 与SORT_FOR_GROUP冲突 | partition by 列表有普通列时 | - +| PARA_TABLES_SORT| 无 | 超级表的数据按时间戳排序时, 不使用临时磁盘空间, 只使用内存。当子表数量多, 行长比较大时候, 会使用大量内存, 可能发生OOM | 超级表的数据按时间戳排序时 | 举例: ```sql SELECT /*+ BATCH_SCAN() */ a.ts FROM stable1 a, stable2 b where a.tag0 = b.tag0 and a.ts = b.ts; SELECT /*+ SORT_FOR_GROUP() */ count(*), c1 FROM stable1 PARTITION BY c1; SELECT /*+ PARTITION_FIRST() */ count(*), c1 FROM stable1 PARTITION BY c1; +SELECT /*+ PARA_TABLES_SORT() */ * from stable1 order by ts; ``` ## 列表 diff --git a/include/common/tdatablock.h b/include/common/tdatablock.h index bcb40f4175..292e9a3181 100644 --- a/include/common/tdatablock.h +++ b/include/common/tdatablock.h @@ -32,6 +32,9 @@ typedef struct SBlockOrderInfo { SColumnInfoData* pColData; } SBlockOrderInfo; +#define BLOCK_VERSION_1 1 +#define BLOCK_VERSION_2 2 + #define NBIT (3u) #define BitPos(_n) ((_n) & ((1 << NBIT) - 1)) #define BMCharPos(bm_, r_) ((bm_)[(r_) >> NBIT]) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 605a9f4a1c..c3ab8f0bf2 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -207,9 +207,6 @@ typedef enum _mgmt_table { #define TD_CHILD_TABLE TSDB_CHILD_TABLE #define TD_NORMAL_TABLE TSDB_NORMAL_TABLE -#define TD_REQ_FROM_APP 0 -#define TD_REQ_FROM_TAOX 1 - typedef enum ENodeType { // Syntax nodes are used in parser and planner module, and some are also used in executor module, such as COLUMN, // VALUE, OPERATOR, FUNCTION and so on. @@ -760,7 +757,7 @@ static FORCE_INLINE int32_t tDecodeSSchemaWrapperEx(SDecoder* pDecoder, SSchemaW typedef struct { char name[TSDB_TABLE_FNAME_LEN]; int8_t igExists; - int8_t source; // 1-taosX or 0-taosClient + int8_t source; // TD_REQ_FROM_TAOX-taosX or TD_REQ_FROM_APP-taosClient int8_t reserved[6]; tb_uid_t suid; int64_t delay1; @@ -803,7 +800,7 @@ void tFreeSMCreateStbRsp(SMCreateStbRsp* pRsp); typedef struct { char name[TSDB_TABLE_FNAME_LEN]; int8_t igNotExists; - int8_t source; // 1-taosX or 0-taosClient + int8_t source; // TD_REQ_FROM_TAOX-taosX or TD_REQ_FROM_APP-taosClient int8_t reserved[6]; tb_uid_t suid; int32_t sqlLen; @@ -1614,7 +1611,6 @@ typedef struct { SEp ep; char active[TSDB_ACTIVE_KEY_LEN]; char connActive[TSDB_CONN_ACTIVE_KEY_LEN]; - char machineId[TSDB_MACHINE_ID_LEN + 1]; } SDnodeInfo; typedef struct { @@ -2662,6 +2658,7 @@ typedef struct SVCreateStbReq { SRSmaParam rsmaParam; int32_t alterOriDataLen; void* alterOriData; + int8_t source; } SVCreateStbReq; int tEncodeSVCreateStbReq(SEncoder* pCoder, const SVCreateStbReq* pReq); @@ -2731,6 +2728,7 @@ typedef struct { SVCreateTbReq* pReqs; SArray* pArray; }; + int8_t source; // TD_REQ_FROM_TAOX-taosX or TD_REQ_FROM_APP-taosClient } SVCreateTbBatchReq; int tEncodeSVCreateTbBatchReq(SEncoder* pCoder, const SVCreateTbBatchReq* pReq); @@ -2823,6 +2821,7 @@ typedef struct { int32_t newCommentLen; char* newComment; int64_t ctimeMs; // fill by vnode + int8_t source; // TD_REQ_FROM_TAOX-taosX or TD_REQ_FROM_APP-taosClient } SVAlterTbReq; int32_t tEncodeSVAlterTbReq(SEncoder* pEncoder, const SVAlterTbReq* pReq); @@ -3920,12 +3919,13 @@ int32_t tDeatroySMqHbRsp(SMqHbRsp* pRsp); int32_t tSerializeSMqSeekReq(void* buf, int32_t bufLen, SMqSeekReq* pReq); int32_t tDeserializeSMqSeekReq(void* buf, int32_t bufLen, SMqSeekReq* pReq); +#define TD_REQ_FROM_APP 0x0 #define SUBMIT_REQ_AUTO_CREATE_TABLE 0x1 #define SUBMIT_REQ_COLUMN_DATA_FORMAT 0x2 #define SUBMIT_REQ_FROM_FILE 0x4 +#define TD_REQ_FROM_TAOX 0x8 -#define SOURCE_NULL 0 -#define SOURCE_TAOSX 1 +#define TD_REQ_FROM_TAOX_OLD 0x1 // for compatibility typedef struct { int32_t flags; @@ -3938,7 +3938,6 @@ typedef struct { SArray* aCol; }; int64_t ctimeMs; - int8_t source; } SSubmitTbData; typedef struct { diff --git a/include/util/tbase58.h b/include/util/tbase58.h index e1b03f8a8f..ab62e1926e 100644 --- a/include/util/tbase58.h +++ b/include/util/tbase58.h @@ -22,6 +22,9 @@ extern "C" { #endif +#define TBASE_MAX_ILEN 4096 +#define TBASE_MAX_OLEN 5653 + uint8_t *base58_decode(const char *value, size_t inlen, int32_t *outlen); char *base58_encode(const uint8_t *value, int32_t vlen); diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 971ed407f9..68eb569dfc 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -1850,7 +1850,7 @@ static int32_t estimateJsonLen(SReqResultInfo* pResultInfo, int32_t numOfCols, i char* pStart = p + len; for (int32_t i = 0; i < numOfCols; ++i) { - int32_t colLen = (blockVersion == 1) ? htonl(colLength[i]) : colLength[i]; + int32_t colLen = (blockVersion == BLOCK_VERSION_1) ? htonl(colLength[i]) : colLength[i]; if (pResultInfo->fields[i].type == TSDB_DATA_TYPE_JSON) { int32_t* offset = (int32_t*)pStart; @@ -1949,8 +1949,8 @@ static int32_t doConvertJson(SReqResultInfo* pResultInfo, int32_t numOfCols, int char* pStart = p; char* pStart1 = p1; for (int32_t i = 0; i < numOfCols; ++i) { - int32_t colLen = (blockVersion == 1) ? htonl(colLength[i]) : colLength[i]; - int32_t colLen1 = (blockVersion == 1) ? htonl(colLength1[i]) : colLength1[i]; + int32_t colLen = (blockVersion == BLOCK_VERSION_1) ? htonl(colLength[i]) : colLength[i]; + int32_t colLen1 = (blockVersion == BLOCK_VERSION_1) ? htonl(colLength1[i]) : colLength1[i]; if (ASSERT(colLen < dataLen)) { tscError("doConvertJson error: colLen:%d >= dataLen:%d", colLen, dataLen); return TSDB_CODE_TSC_INTERNAL_ERROR; @@ -2009,7 +2009,7 @@ static int32_t doConvertJson(SReqResultInfo* pResultInfo, int32_t numOfCols, int } colLen1 = len; totalLen += colLen1; - colLength1[i] = (blockVersion == 1) ? htonl(len) : len; + colLength1[i] = (blockVersion == BLOCK_VERSION_1) ? htonl(len) : len; } else if (IS_VAR_DATA_TYPE(pResultInfo->fields[i].type)) { len = numOfRows * sizeof(int32_t); memcpy(pStart1, pStart, len); @@ -2098,7 +2098,7 @@ int32_t setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32 char* pStart = p; for (int32_t i = 0; i < numOfCols; ++i) { - if(blockVersion == 1){ + if(blockVersion == BLOCK_VERSION_1){ colLength[i] = htonl(colLength[i]); } if (colLength[i] >= dataLen) { diff --git a/source/client/src/clientRawBlockWrite.c b/source/client/src/clientRawBlockWrite.c index 448243cc3d..f143624bab 100644 --- a/source/client/src/clientRawBlockWrite.c +++ b/source/client/src/clientRawBlockWrite.c @@ -1001,6 +1001,7 @@ static int32_t taosCreateTable(TAOS* taos, void* meta, int32_t metaLen) { tBatch.req.pArray = taosArrayInit(4, sizeof(struct SVCreateTbReq)); taosArrayPush(tBatch.req.pArray, pCreateReq); + tBatch.req.source = TD_REQ_FROM_TAOX; taosHashPut(pVgroupHashmap, &pInfo.vgId, sizeof(pInfo.vgId), &tBatch, sizeof(tBatch)); } else { // add to the correct vgroup @@ -1276,7 +1277,7 @@ static int32_t taosAlterTable(TAOS* taos, void* meta, int32_t metaLen) { return terrno; } SVAlterTbReq req = {0}; - SDecoder coder = {0}; + SDecoder dcoder = {0}; int32_t code = TSDB_CODE_SUCCESS; SRequestObj* pRequest = NULL; SQuery* pQuery = NULL; @@ -1297,8 +1298,8 @@ static int32_t taosAlterTable(TAOS* taos, void* meta, int32_t metaLen) { // decode and process req void* data = POINTER_SHIFT(meta, sizeof(SMsgHead)); int32_t len = metaLen - sizeof(SMsgHead); - tDecoderInit(&coder, data, len); - if (tDecodeSVAlterTbReq(&coder, &req) < 0) { + tDecoderInit(&dcoder, data, len); + if (tDecodeSVAlterTbReq(&dcoder, &req) < 0) { code = TSDB_CODE_INVALID_PARA; goto end; } @@ -1340,14 +1341,36 @@ static int32_t taosAlterTable(TAOS* taos, void* meta, int32_t metaLen) { goto end; } pVgData->vg = pInfo; - pVgData->pData = taosMemoryMalloc(metaLen); - if (NULL == pVgData->pData) { + + int tlen = 0; + req.source = TD_REQ_FROM_TAOX; + tEncodeSize(tEncodeSVAlterTbReq, &req, tlen, code); + if(code != 0){ code = TSDB_CODE_OUT_OF_MEMORY; goto end; } - memcpy(pVgData->pData, meta, metaLen); - ((SMsgHead*)pVgData->pData)->vgId = htonl(pInfo.vgId); - pVgData->size = metaLen; + tlen += sizeof(SMsgHead); + void* pMsg = taosMemoryMalloc(tlen); + if (NULL == pMsg) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto end; + } + ((SMsgHead*)pMsg)->vgId = htonl(pInfo.vgId); + ((SMsgHead*)pMsg)->contLen = htonl(tlen); + void* pBuf = POINTER_SHIFT(pMsg, sizeof(SMsgHead)); + SEncoder coder = {0}; + tEncoderInit(&coder, pBuf, tlen - sizeof(SMsgHead)); + code = tEncodeSVAlterTbReq(&coder, &req); + if(code != 0){ + tEncoderClear(&coder); + code = TSDB_CODE_OUT_OF_MEMORY; + goto end; + } + tEncoderClear(&coder); + + pVgData->pData = pMsg; + pVgData->size = tlen; + pVgData->numOfTables = 1; taosArrayPush(pArray, &pVgData); @@ -1387,7 +1410,7 @@ end: if (pVgData) taosMemoryFreeClear(pVgData->pData); taosMemoryFreeClear(pVgData); destroyRequest(pRequest); - tDecoderClear(&coder); + tDecoderClear(&dcoder); qDestroyQuery(pQuery); terrno = code; return code; diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 1904874a0b..9b74456da2 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -389,7 +389,7 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value } } if (strcasecmp(key, "msg.consume.excluded") == 0) { - conf->sourceExcluded = taosStr2int64(value); + conf->sourceExcluded = (taosStr2int64(value) != 0) ? TD_REQ_FROM_TAOX : 0; return TMQ_CONF_OK; } @@ -1611,17 +1611,39 @@ SMqMetaRspObj* tmqBuildMetaRspFromWrapper(SMqPollRspWrapper* pWrapper) { return pRspObj; } -SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper, SMqClientVg* pVg, int64_t* numOfRows) { - SMqRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqRspObj)); - pRspObj->resType = RES_TYPE__TMQ; +void changeByteEndian(char* pData){ + char* p = pData; + + // | version | total length | total rows | total columns | flag seg| block group id | column schema | each column length | + // version: + int32_t blockVersion = *(int32_t*)p; + ASSERT(blockVersion == BLOCK_VERSION_1); + *(int32_t*)p = BLOCK_VERSION_2; + + p += sizeof(int32_t); + p += sizeof(int32_t); + p += sizeof(int32_t); + int32_t cols = *(int32_t*)p; + p += sizeof(int32_t); + p += sizeof(int32_t); + p += sizeof(uint64_t); + // check fields + p += cols * (sizeof(int8_t) + sizeof(int32_t)); + + int32_t* colLength = (int32_t*)p; + for (int32_t i = 0; i < cols; ++i) { + colLength[i] = htonl(colLength[i]); + } +} + +static void tmqBuildRspFromWrapperInner(SMqPollRspWrapper* pWrapper, SMqClientVg* pVg, int64_t* numOfRows, SMqRspObj* pRspObj) { (*numOfRows) = 0; tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN); tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN); pRspObj->vgId = pWrapper->vgHandle->vgId; pRspObj->resIter = -1; - memcpy(&pRspObj->rsp, &pWrapper->dataRsp, sizeof(SMqDataRsp)); pRspObj->resInfo.totalRows = 0; pRspObj->resInfo.precision = TSDB_TIME_PRECISION_MILLI; @@ -1633,41 +1655,44 @@ SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper, SMqClientVg* pVg, } // extract the rows in this data packet for (int32_t i = 0; i < pRspObj->rsp.blockNum; ++i) { - SRetrieveTableRspForTmq* pRetrieve = (SRetrieveTableRspForTmq*)taosArrayGetP(pRspObj->rsp.blockData, i); - int64_t rows = htobe64(pRetrieve->numOfRows); + void* pRetrieve = taosArrayGetP(pRspObj->rsp.blockData, i); + void* rawData = NULL; + int64_t rows = 0; + // deal with compatibility + if(*(int64_t*)pRetrieve == 0){ + rawData = ((SRetrieveTableRsp*)pRetrieve)->data; + rows = htobe64(((SRetrieveTableRsp*)pRetrieve)->numOfRows); + }else if(*(int64_t*)pRetrieve == 1){ + rawData = ((SRetrieveTableRspForTmq*)pRetrieve)->data; + rows = htobe64(((SRetrieveTableRspForTmq*)pRetrieve)->numOfRows); + } + pVg->numOfRows += rows; (*numOfRows) += rows; - - if (needTransformSchema) { // withSchema is false if subscribe subquery, true if subscribe db or stable - SSchemaWrapper* schema = tCloneSSchemaWrapper(&pWrapper->topicHandle->schema); - if (schema) { + changeByteEndian(rawData); + if (needTransformSchema) { //withSchema is false if subscribe subquery, true if subscribe db or stable + SSchemaWrapper *schema = tCloneSSchemaWrapper(&pWrapper->topicHandle->schema); + if(schema){ taosArrayPush(pRspObj->rsp.blockSchema, &schema); } } } +} +SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper, SMqClientVg* pVg, int64_t* numOfRows) { + SMqRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqRspObj)); + pRspObj->resType = RES_TYPE__TMQ; + memcpy(&pRspObj->rsp, &pWrapper->dataRsp, sizeof(SMqDataRsp)); + tmqBuildRspFromWrapperInner(pWrapper, pVg, numOfRows, pRspObj); return pRspObj; } SMqTaosxRspObj* tmqBuildTaosxRspFromWrapper(SMqPollRspWrapper* pWrapper, SMqClientVg* pVg, int64_t* numOfRows) { SMqTaosxRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqTaosxRspObj)); pRspObj->resType = RES_TYPE__TMQ_METADATA; - tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN); - tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN); - pRspObj->vgId = pWrapper->vgHandle->vgId; - pRspObj->resIter = -1; memcpy(&pRspObj->rsp, &pWrapper->taosxRsp, sizeof(STaosxRsp)); - pRspObj->resInfo.totalRows = 0; - pRspObj->resInfo.precision = TSDB_TIME_PRECISION_MILLI; - - // extract the rows in this data packet - for (int32_t i = 0; i < pRspObj->rsp.blockNum; ++i) { - SRetrieveTableRspForTmq* pRetrieve = (SRetrieveTableRspForTmq*)taosArrayGetP(pRspObj->rsp.blockData, i); - int64_t rows = htobe64(pRetrieve->numOfRows); - pVg->numOfRows += rows; - (*numOfRows) += rows; - } + tmqBuildRspFromWrapperInner(pWrapper, pVg, numOfRows, (SMqRspObj*)pRspObj); return pRspObj; } diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index b97c49e53f..30d73b6bd0 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -2197,7 +2197,7 @@ int32_t blockEncode(const SSDataBlock* pBlock, char* data, int32_t numOfCols) { // todo extract method int32_t* version = (int32_t*)data; - *version = 2; + *version = BLOCK_VERSION_1; data += sizeof(int32_t); int32_t* actualLen = (int32_t*)data; @@ -2278,7 +2278,7 @@ int32_t blockEncode(const SSDataBlock* pBlock, char* data, int32_t numOfCols) { data += colSizes[col]; } -// colSizes[col] = htonl(colSizes[col]); + colSizes[col] = htonl(colSizes[col]); // uError("blockEncode col bytes:%d, type:%d, size:%d, htonl size:%d", pColRes->info.bytes, pColRes->info.type, // htonl(colSizes[col]), colSizes[col]); } @@ -2343,9 +2343,7 @@ const char* blockDecode(SSDataBlock* pBlock, const char* pData) { pStart += sizeof(int32_t) * numOfCols; for (int32_t i = 0; i < numOfCols; ++i) { - if(version == 1){ - colLen[i] = htonl(colLen[i]); - } + colLen[i] = htonl(colLen[i]); ASSERT(colLen[i] >= 0); SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i); diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 2ffa12f2c1..85f5d462c7 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -7513,6 +7513,7 @@ int tEncodeSVCreateStbReq(SEncoder *pCoder, const SVCreateStbReq *pReq) { if (pReq->alterOriDataLen > 0) { if (tEncodeBinary(pCoder, pReq->alterOriData, pReq->alterOriDataLen) < 0) return -1; } + if (tEncodeI8(pCoder, pReq->source) < 0) return -1; tEndEncode(pCoder); return 0; @@ -7535,6 +7536,10 @@ int tDecodeSVCreateStbReq(SDecoder *pCoder, SVCreateStbReq *pReq) { if (tDecodeBinary(pCoder, (uint8_t **)&pReq->alterOriData, NULL) < 0) return -1; } + if (!tDecodeIsEnd(pCoder)) { + if (tDecodeI8(pCoder, &pReq->source) < 0) return -1; + } + tEndDecode(pCoder); return 0; } @@ -7663,6 +7668,8 @@ int tEncodeSVCreateTbBatchReq(SEncoder *pCoder, const SVCreateTbBatchReq *pReq) if (tEncodeSVCreateTbReq(pCoder, (SVCreateTbReq *)taosArrayGet(pReq->pArray, iReq)) < 0) return -1; } + if (tEncodeI8(pCoder, pReq->source) < 0) return -1; + tEndEncode(pCoder); return 0; } @@ -7677,6 +7684,10 @@ int tDecodeSVCreateTbBatchReq(SDecoder *pCoder, SVCreateTbBatchReq *pReq) { if (tDecodeSVCreateTbReq(pCoder, pReq->pReqs + iReq) < 0) return -1; } + if (!tDecodeIsEnd(pCoder)) { + if (tDecodeI8(pCoder, &pReq->source) < 0) return -1; + } + tEndDecode(pCoder); return 0; } @@ -8034,6 +8045,7 @@ int32_t tEncodeSVAlterTbReq(SEncoder *pEncoder, const SVAlterTbReq *pReq) { break; } if (tEncodeI64(pEncoder, pReq->ctimeMs) < 0) return -1; + if (tEncodeI8(pEncoder, pReq->source) < 0) return -1; tEndEncode(pEncoder); return 0; @@ -8094,6 +8106,9 @@ int32_t tDecodeSVAlterTbReq(SDecoder *pDecoder, SVAlterTbReq *pReq) { if (!tDecodeIsEnd(pDecoder)) { if (tDecodeI64(pDecoder, &pReq->ctimeMs) < 0) return -1; } + if (!tDecodeIsEnd(pDecoder)) { + if (tDecodeI8(pDecoder, &pReq->source) < 0) return -1; + } tEndDecode(pDecoder); return 0; @@ -8670,7 +8685,6 @@ static int32_t tEncodeSSubmitTbData(SEncoder *pCoder, const SSubmitTbData *pSubm } } if (tEncodeI64(pCoder, pSubmitTbData->ctimeMs) < 0) return -1; - if (tEncodeI8(pCoder, pSubmitTbData->source) < 0) return -1; tEndEncode(pCoder); return 0; @@ -8758,12 +8772,6 @@ static int32_t tDecodeSSubmitTbData(SDecoder *pCoder, SSubmitTbData *pSubmitTbDa goto _exit; } } - if (!tDecodeIsEnd(pCoder)) { - if (tDecodeI8(pCoder, &pSubmitTbData->source) < 0) { - code = TSDB_CODE_INVALID_MSG; - goto _exit; - } - } tEndDecode(pCoder); diff --git a/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c b/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c index a3a29f6f77..fa59f56496 100644 --- a/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c +++ b/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c @@ -16,7 +16,6 @@ #define _DEFAULT_SOURCE #include "dmInt.h" #include "systable.h" -#include "tgrant.h" extern SConfig *tsCfg; @@ -118,11 +117,7 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) { req.memTotal = tsTotalMemoryKB * 1024; req.memAvail = req.memTotal - tsRpcQueueMemoryAllowed - 16 * 1024 * 1024; tstrncpy(req.dnodeEp, tsLocalEp, TSDB_EP_LEN); - char *machine = tGetMachineId(); - if (machine) { - tstrncpy(req.machineId, machine, TSDB_MACHINE_ID_LEN + 1); - taosMemoryFreeClear(machine); - } + tstrncpy(req.machineId, pMgmt->pData->machineId, TSDB_MACHINE_ID_LEN + 1); req.clusterCfg.statusInterval = tsStatusInterval; req.clusterCfg.checkTime = 0; diff --git a/source/dnode/mgmt/node_mgmt/src/dmMgmt.c b/source/dnode/mgmt/node_mgmt/src/dmMgmt.c index 6cbf31b15f..049b1bdf84 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmMgmt.c +++ b/source/dnode/mgmt/node_mgmt/src/dmMgmt.c @@ -22,6 +22,7 @@ #ifdef TD_TSZ #include "tcompression.h" #include "tglobal.h" +#include "tgrant.h" #endif static bool dmRequireNode(SDnode *pDnode, SMgmtWrapper *pWrapper) { @@ -137,6 +138,16 @@ int32_t dmInitVars(SDnode *pDnode) { pData->rebootTime = taosGetTimestampMs(); pData->dropped = 0; pData->stopped = 0; + char *machineId = tGetMachineId(); + if (machineId) { + tstrncpy(pData->machineId, machineId, TSDB_MACHINE_ID_LEN + 1); + taosMemoryFreeClear(machineId); + } else { +#if defined(TD_ENTERPRISE) && !defined(GRANTS_CFG) + terrno = TSDB_CODE_DNODE_NO_MACHINE_CODE; + return -1; +#endif + } pData->dnodeHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); if (pData->dnodeHash == NULL) { diff --git a/source/dnode/mgmt/node_util/inc/dmUtil.h b/source/dnode/mgmt/node_util/inc/dmUtil.h index 4769ef8538..504c405506 100644 --- a/source/dnode/mgmt/node_util/inc/dmUtil.h +++ b/source/dnode/mgmt/node_util/inc/dmUtil.h @@ -109,6 +109,7 @@ typedef struct { SMsgCb msgCb; bool validMnodeEps; int64_t ipWhiteVer; + char machineId[TSDB_MACHINE_ID_LEN + 1]; } SDnodeData; typedef struct { diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index fc58fabad5..4a092057ce 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -464,6 +464,7 @@ typedef struct { char* pAst1; char* pAst2; SRWLatch lock; + int8_t source; } SStbObj; typedef struct { diff --git a/source/dnode/mnode/impl/inc/mndStream.h b/source/dnode/mnode/impl/inc/mndStream.h index 1084340dc2..aed49809dd 100644 --- a/source/dnode/mnode/impl/inc/mndStream.h +++ b/source/dnode/mnode/impl/inc/mndStream.h @@ -119,6 +119,7 @@ int32_t mndStreamSetPauseAction(SMnode *pMnode, STrans *pTrans, SStreamObj * int32_t mndStreamSetDropAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream); int32_t mndStreamSetDropActionFromList(SMnode *pMnode, STrans *pTrans, SArray *pList); int32_t mndStreamSetResetTaskAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream); +int32_t mndCreateStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream); SStreamTaskIter *createStreamTaskIter(SStreamObj *pStream); void destroyStreamTaskIter(SStreamTaskIter *pIter); diff --git a/source/dnode/mnode/impl/src/mndDnode.c b/source/dnode/mnode/impl/src/mndDnode.c index cb8a24e675..c65c228224 100644 --- a/source/dnode/mnode/impl/src/mndDnode.c +++ b/source/dnode/mnode/impl/src/mndDnode.c @@ -141,7 +141,7 @@ static int32_t mndCreateDefaultDnode(SMnode *pMnode) { memcpy(dnodeObj.machineId, machineId, TSDB_MACHINE_ID_LEN); taosMemoryFreeClear(machineId); } else { -#ifdef TD_ENTERPRISE +#if defined(TD_ENTERPRISE) && !defined(GRANTS_CFG) terrno = TSDB_CODE_DNODE_NO_MACHINE_CODE; goto _OVER; #endif @@ -410,9 +410,6 @@ void mndGetDnodeData(SMnode *pMnode, SArray *pDnodeInfo) { dInfo.ep.port = pDnode->port; dInfo.offlineReason = pDnode->offlineReason; tstrncpy(dInfo.ep.fqdn, pDnode->fqdn, TSDB_FQDN_LEN); - tstrncpy(dInfo.active, pDnode->active, TSDB_ACTIVE_KEY_LEN); - tstrncpy(dInfo.connActive, pDnode->connActive, TSDB_CONN_ACTIVE_KEY_LEN); - tstrncpy(dInfo.machineId, pDnode->machineId, TSDB_MACHINE_ID_LEN + 1); sdbRelease(pSdb, pDnode); if (mndIsMnode(pMnode, pDnode->id)) { dInfo.isMnode = 1; diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index cbc0ace75d..ef9a7205e1 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -44,9 +44,8 @@ static bool hasCountWindowNode(SPhysiNode* pNode) { } } -static bool countWindowStreamTask(SSubplan* pPlan) { - SPhysiNode* pNode = pPlan->pNode; - return hasCountWindowNode(pNode); +static bool isCountWindowStreamTask(SSubplan* pPlan) { + return hasCountWindowNode((SPhysiNode*)pPlan->pNode); } int32_t mndConvertRsmaTask(char** pDst, int32_t* pDstLen, const char* ast, int64_t uid, int8_t triggerType, @@ -342,13 +341,13 @@ static void streamTaskSetDataRange(SStreamTask* pTask, int64_t skey, SArray* pVe } } -static void haltInitialTaskStatus(SStreamTask* pTask, SSubplan* pPlan) { - bool hasCountWindowNode = countWindowStreamTask(pPlan); - bool isRelStreamTask = (pTask->hTaskInfo.id.taskId != 0); - if (hasCountWindowNode && isRelStreamTask) { +static void haltInitialTaskStatus(SStreamTask* pTask, SSubplan* pPlan, bool isFillhistoryTask) { + bool hasCountWindowNode = isCountWindowStreamTask(pPlan); + + if (hasCountWindowNode && (!isFillhistoryTask)) { SStreamStatus* pStatus = &pTask->status; - mDebug("s-task:0x%x status is set to %s from %s for count window agg task with fill-history option set", - pTask->id.taskId, streamTaskGetStatusStr(pStatus->taskStatus), streamTaskGetStatusStr(TASK_STATUS__HALT)); + mDebug("s-task:0x%x status set %s from %s for count window agg task with fill-history option set", + pTask->id.taskId, streamTaskGetStatusStr(TASK_STATUS__HALT), streamTaskGetStatusStr(pStatus->taskStatus)); pStatus->taskStatus = TASK_STATUS__HALT; } } @@ -398,15 +397,17 @@ static void setHTasksId(SStreamObj* pStream) { static int32_t doAddSourceTask(SMnode* pMnode, SSubplan* plan, SStreamObj* pStream, SEpSet* pEpset, int64_t skey, SArray* pVerList, SVgObj* pVgroup, bool isFillhistory, bool useTriggerParam) { - // new stream task SStreamTask* pTask = buildSourceTask(pStream, pEpset, isFillhistory, useTriggerParam); if (pTask == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return terrno; } + mDebug("doAddSourceTask taskId:%s, vgId:%d, isFillHistory:%d", pTask->id.idStr, pVgroup->vgId, isFillhistory); - haltInitialTaskStatus(pTask, plan); + if (pStream->conf.fillHistory) { + haltInitialTaskStatus(pTask, plan, isFillhistory); + } streamTaskSetDataRange(pTask, skey, pVerList, pVgroup->vgId); @@ -415,6 +416,7 @@ static int32_t doAddSourceTask(SMnode* pMnode, SSubplan* plan, SStreamObj* pStre terrno = code; return terrno; } + return TDB_CODE_SUCCESS; } diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index 1ddd2f34e6..aad1dc881b 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -458,6 +458,7 @@ void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb, int3 req.rollup = pStb->ast1Len > 0 ? 1 : 0; req.alterOriData = alterOriData; req.alterOriDataLen = alterOriDataLen; + req.source = pStb->source; // todo req.schemaRow.nCols = pStb->numOfColumns; req.schemaRow.version = pStb->colVer; @@ -774,7 +775,8 @@ int32_t mndBuildStbFromReq(SMnode *pMnode, SStbObj *pDst, SMCreateStbReq *pCreat pDst->createdTime = taosGetTimestampMs(); pDst->updateTime = pDst->createdTime; pDst->uid = - (pCreate->source == TD_REQ_FROM_TAOX) ? pCreate->suid : mndGenerateUid(pCreate->name, TSDB_TABLE_FNAME_LEN); + (pCreate->source == TD_REQ_FROM_TAOX_OLD || pCreate->source == TD_REQ_FROM_TAOX) + ? pCreate->suid : mndGenerateUid(pCreate->name, TSDB_TABLE_FNAME_LEN); pDst->dbUid = pDb->uid; pDst->tagVer = 1; pDst->colVer = 1; @@ -790,6 +792,7 @@ int32_t mndBuildStbFromReq(SMnode *pMnode, SStbObj *pDst, SMCreateStbReq *pCreat pDst->numOfFuncs = pCreate->numOfFuncs; pDst->commentLen = pCreate->commentLen; pDst->pFuncs = pCreate->pFuncs; + pDst->source = pCreate->source; pCreate->pFuncs = NULL; if (pDst->commentLen > 0) { @@ -1033,6 +1036,7 @@ static int32_t mndBuildStbFromAlter(SStbObj *pStb, SStbObj *pDst, SMCreateStbReq memcpy(pDst, pStb, sizeof(SStbObj)); taosRUnLockLatch(&pStb->lock); + pDst->source = createReq->source; pDst->updateTime = taosGetTimestampMs(); pDst->numOfColumns = createReq->numOfColumns; pDst->numOfTags = createReq->numOfTags; @@ -1141,7 +1145,7 @@ static int32_t mndProcessCreateStbReq(SRpcMsg *pReq) { } } else if (terrno != TSDB_CODE_MND_STB_NOT_EXIST) { goto _OVER; - } else if (createReq.source == TD_REQ_FROM_TAOX && (createReq.tagVer != 1 || createReq.colVer != 1)) { + } else if ((createReq.source == TD_REQ_FROM_TAOX_OLD || createReq.source == TD_REQ_FROM_TAOX) && (createReq.tagVer != 1 || createReq.colVer != 1)) { mInfo("stb:%s, alter table does not need to be done, because table is deleted", createReq.name); code = 0; goto _OVER; @@ -2572,7 +2576,7 @@ static int32_t mndProcessDropStbReq(SRpcMsg *pReq) { } } - if (dropReq.source == TD_REQ_FROM_TAOX && pStb->uid != dropReq.suid) { + if ((dropReq.source == TD_REQ_FROM_TAOX_OLD || dropReq.source == TD_REQ_FROM_TAOX) && pStb->uid != dropReq.suid) { code = 0; goto _OVER; } diff --git a/source/dnode/mnode/impl/src/mndStreamHb.c b/source/dnode/mnode/impl/src/mndStreamHb.c index fc472dabe5..1d296a1c6e 100644 --- a/source/dnode/mnode/impl/src/mndStreamHb.c +++ b/source/dnode/mnode/impl/src/mndStreamHb.c @@ -69,7 +69,7 @@ static void addIntoCheckpointList(SArray* pList, const SFailedCheckpointInfo* pI taosArrayPush(pList, pInfo); } -int32_t createStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream) { +int32_t mndCreateStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream) { STrans *pTrans = doCreateTrans(pMnode, pStream, NULL, MND_STREAM_TASK_RESET_NAME, " reset from failed checkpoint"); if (pTrans == NULL) { return terrno; @@ -119,7 +119,7 @@ static int32_t mndResetStatusFromCheckpoint(SMnode *pMnode, int64_t streamId, in } else { mDebug("stream:%s (0x%" PRIx64 ") reset checkpoint procedure, transId:%d, create reset trans", pStream->name, pStream->uid, transId); - code = createStreamResetStatusTrans(pMnode, pStream); + code = mndCreateStreamResetStatusTrans(pMnode, pStream); } } diff --git a/source/dnode/mnode/impl/src/mndStreamTrans.c b/source/dnode/mnode/impl/src/mndStreamTrans.c index 5bfd3933b5..2241d93465 100644 --- a/source/dnode/mnode/impl/src/mndStreamTrans.c +++ b/source/dnode/mnode/impl/src/mndStreamTrans.c @@ -261,22 +261,30 @@ int32_t setTransAction(STrans *pTrans, void *pCont, int32_t contLen, int32_t msg return mndTransAppendRedoAction(pTrans, &action); } +static bool identicalName(const char* pDb, const char* pParam, int32_t len) { + return (strlen(pDb) == len) && (strncmp(pDb, pParam, len) == 0); +} + int32_t doKillCheckpointTrans(SMnode *pMnode, const char *pDBName, size_t len) { - // data in the hash table will be removed automatically, no need to remove it here. - SStreamTransInfo *pTransInfo = taosHashGet(execInfo.transMgmt.pDBTrans, pDBName, len); - if (pTransInfo == NULL) { - return TSDB_CODE_SUCCESS; - } + void *pIter = NULL; - // not checkpoint trans, ignore - if (strcmp(pTransInfo->name, MND_STREAM_CHECKPOINT_NAME) != 0) { - mDebug("not checkpoint trans, not kill it, name:%s, transId:%d", pTransInfo->name, pTransInfo->transId); - return TSDB_CODE_SUCCESS; - } + while ((pIter = taosHashIterate(execInfo.transMgmt.pDBTrans, pIter)) != NULL) { + SStreamTransInfo *pTransInfo = (SStreamTransInfo *)pIter; + if (strcmp(pTransInfo->name, MND_STREAM_CHECKPOINT_NAME) != 0) { + continue; + } - char *pDupDBName = strndup(pDBName, len); - mndKillTransImpl(pMnode, pTransInfo->transId, pDupDBName); - taosMemoryFree(pDupDBName); + SStreamObj *pStream = mndGetStreamObj(pMnode, pTransInfo->streamId); + if (pStream != NULL) { + if (identicalName(pStream->sourceDb, pDBName, len)) { + mndKillTransImpl(pMnode, pTransInfo->transId, pStream->sourceDb); + } else if (identicalName(pStream->targetDb, pDBName, len)) { + mndKillTransImpl(pMnode, pTransInfo->transId, pStream->targetDb); + } + + mndReleaseStream(pMnode, pStream); + } + } return TSDB_CODE_SUCCESS; } diff --git a/source/dnode/mnode/impl/src/mndStreamUtil.c b/source/dnode/mnode/impl/src/mndStreamUtil.c index 3cabce2201..1ae85a2cc6 100644 --- a/source/dnode/mnode/impl/src/mndStreamUtil.c +++ b/source/dnode/mnode/impl/src/mndStreamUtil.c @@ -231,6 +231,8 @@ static int32_t doSetResumeAction(STrans *pTrans, SMnode *pMnode, SStreamTask *pT taosMemoryFree(pReq); return -1; } + + mDebug("set the resume action for trans:%d", pTrans->id); return 0; } diff --git a/source/dnode/mnode/impl/src/mndUser.c b/source/dnode/mnode/impl/src/mndUser.c index 5e5a3626a4..8a2313370a 100644 --- a/source/dnode/mnode/impl/src/mndUser.c +++ b/source/dnode/mnode/impl/src/mndUser.c @@ -2484,7 +2484,7 @@ static int32_t mndRetrievePrivileges(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataSetVal(pColInfo, numOfRows, (const char *)privilege, false); - char objName[20] = {0}; + char objName[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0}; STR_WITH_MAXSIZE_TO_VARSTR(objName, "all", pShow->pMeta->pSchemas[cols].bytes); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataSetVal(pColInfo, numOfRows, (const char *)objName, false); diff --git a/source/dnode/mnode/impl/test/stream/stream.cpp b/source/dnode/mnode/impl/test/stream/stream.cpp index e3bfdb5d6c..8d106b1ede 100644 --- a/source/dnode/mnode/impl/test/stream/stream.cpp +++ b/source/dnode/mnode/impl/test/stream/stream.cpp @@ -29,6 +29,9 @@ #include "../../inc/mndStream.h" namespace { + +static int64_t defStreamId = 999; + SRpcMsg buildHbReq() { SStreamHbMsg msg = {0}; msg.vgId = 1; @@ -40,7 +43,7 @@ SRpcMsg buildHbReq() { entry.nodeId = i + 1; entry.stage = 1; entry.id.taskId = i + 1; - entry.id.streamId = 999; + entry.id.streamId = defStreamId; if (i == 0) { entry.stage = 4; @@ -57,7 +60,7 @@ SRpcMsg buildHbReq() { entry.stage = 1; entry.id.taskId = 5; - entry.id.streamId = 999; + entry.id.streamId = defStreamId; entry.checkpointId = 1; entry.checkpointFailed = true; @@ -118,15 +121,16 @@ void setTask(SStreamTask* pTask, int32_t nodeId, int64_t streamId, int32_t taskI taosHashPut(pExecNode->pTaskMap, &id, sizeof(id), &entry, sizeof(entry)); taosArrayPush(pExecNode->pTaskList, &id); } + void initStreamExecInfo() { SStreamExecInfo* pExecNode = &execInfo; SStreamTask task = {0}; - setTask(&task, 1, 999, 1); - setTask(&task, 1, 999, 2); - setTask(&task, 1, 999, 3); - setTask(&task, 1, 999, 4); - setTask(&task, 2, 999, 5); + setTask(&task, 1, defStreamId, 1); + setTask(&task, 1, defStreamId, 2); + setTask(&task, 1, defStreamId, 3); + setTask(&task, 1, defStreamId, 4); + setTask(&task, 2, defStreamId, 5); } void initNodeInfo() { @@ -138,38 +142,112 @@ void initNodeInfo() { } } // namespace +class StreamTest : public testing::Test { // 继承了 testing::Test + protected: + + static void SetUpTestSuite() { + mndInitExecInfo(); + initStreamExecInfo(); + initNodeInfo(); + + std::cout<<"setup env for streamTest suite"<(taosMemoryCalloc(1, sizeof(SMnode))); + {// init sdb + SSdbOpt opt = {0}; + opt.path = pMnode->path; + opt.pMnode = pMnode; + opt.pWal = pMnode->pWal; + + pMnode->pSdb = sdbInit(&opt); + taosThreadMutexInit(&pMnode->syncMgmt.lock, NULL); + } + + SVgroupChangeInfo info; + info.pUpdateNodeList = taosArrayInit(4, sizeof(SNodeUpdateInfo)); + info.pDBMap = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, HASH_NO_LOCK); + + const char* pDbName = "test_db_name"; + int32_t len = strlen(pDbName); + + taosHashPut(info.pDBMap, pDbName, len, NULL, 0); + + killAllCheckpointTrans(pMnode, &info); + + SStreamObj stream; + memset(&stream, 0, sizeof(SStreamObj)); + + stream.uid = defStreamId; + stream.lock = 0; + stream.tasks = taosArrayInit(1, POINTER_BYTES); + stream.pHTasksList = taosArrayInit(1, POINTER_BYTES); + + SArray* pLevel = taosArrayInit(1, POINTER_BYTES); + SStreamTask* pTask = static_cast(taosMemoryCalloc(1, sizeof(SStreamTask))); + pTask->id.streamId = defStreamId; + pTask->id.taskId = 1; + pTask->exec.qmsg = (char*)taosMemoryMalloc(1); + taosThreadMutexInit(&pTask->lock, NULL); + + taosArrayPush(pLevel, &pTask); + + taosArrayPush(stream.tasks, &pLevel); + mndCreateStreamResetStatusTrans(pMnode, &stream); + + tFreeStreamObj(&stream); + sdbCleanup(pMnode->pSdb); + + taosMemoryFree(pMnode); + + taosArrayDestroy(info.pUpdateNodeList); + taosHashCleanup(info.pDBMap); +} + +TEST_F(StreamTest, plan_Test) { char* ast = "{\"NodeType\":\"101\",\"Name\":\"SelectStmt\",\"SelectStmt\":{\"Distinct\":false,\"Projections\":[{\"NodeType\":\"5\",\"Name\":\"Function\",\"Function\":{\"DataType\":{\"Type\":\"9\",\"Precision\":\"0\",\"Scale\":\"0\",\"Bytes\":\"8\"},\"AliasName\":\"#expr_1\",\"UserAlias\":\"_wstart\",\"Name\":\"_wstart\",\"Id\":\"89\",\"Type\":\"3505\",\"UdfBufSize\":\"0\"}},{\"NodeType\":\"5\",\"Name\":\"Function\",\"Function\":{\"DataType\":{\"Type\":\"5\",\"Precision\":\"0\",\"Scale\":\"0\",\"Bytes\":\"8\"},\"AliasName\":\"#expr_2\",\"UserAlias\":\"sum(voltage)\",\"Name\":\"sum\",\"Id\":\"1\",\"Type\":\"14\",\"Parameters\":[{\"NodeType\":\"1\",\"Name\":\"Column\",\"Column\":{\"DataType\":{\"Type\":\"4\",\"Precision\":\"0\",\"Scale\":\"0\",\"Bytes\":\"4\"},\"AliasName\":\"voltage\",\"UserAlias\":\"voltage\",\"TableId\":\"6555383776122680534\",\"TableType\":\"1\",\"ColId\":\"3\",\"ProjId\":\"0\",\"ColType\":\"1\",\"DbName\":\"test\",\"TableName\":\"meters\",\"TableAlias\":\"meters\",\"ColName\":\"voltage\",\"DataBlockId\":\"0\",\"SlotId\":\"0\"}}],\"UdfBufSize\":\"0\"}},{\"NodeType\":\"5\",\"Name\":\"Function\",\"Function\":{\"DataType\":{\"Type\":\"4\",\"Precision\":\"0\",\"Scale\":\"0\",\"Bytes\":\"4\"},\"AliasName\":\"#expr_3\",\"UserAlias\":\"groupid\",\"Name\":\"_group_key\",\"Id\":\"96\",\"Type\":\"3754\",\"Parameters\":[{\"NodeType\":\"1\",\"Name\":\"Column\",\"Column\":{\"DataType\":{\"Type\":\"4\",\"Precision\":\"0\",\"Scale\":\"0\",\"Bytes\":\"4\"},\"AliasName\":\"#expr_3\",\"UserAlias\":\"groupid\",\"TableId\":\"6555383776122680534\",\"TableType\":\"1\",\"ColId\":\"5\",\"ProjId\":\"0\",\"ColType\":\"2\",\"DbName\":\"test\",\"TableName\":\"meters\",\"TableAlias\":\"meters\",\"ColName\":\"groupid\",\"DataBlockId\":\"0\",\"SlotId\":\"0\"}}],\"UdfBufSize\":\"0\"}}],\"From\":{\"NodeType\":\"6\",\"Name\":\"RealTable\",\"RealTable\":{\"DataType\":{\"Type\":\"0\",\"Precision\":\"0\",\"Scale\":\"0\",\"Bytes\":\"0\"},\"AliasName\":\"\",\"UserAlias\":\"\",\"DbName\":\"test\",\"tableName\":\"meters\",\"tableAlias\":\"meters\",\"MetaSize\":\"475\",\"Meta\":{\"VgId\":\"0\",\"TableType\":\"1\",\"Uid\":\"6555383776122680534\",\"Suid\":\"6555383776122680534\",\"Sversion\":\"1\",\"Tversion\":\"1\",\"ComInfo\":{\"NumOfTags\":\"2\",\"Precision\":\"0\",\"NumOfColumns\":\"4\",\"RowSize\":\"20\"},\"ColSchemas\":[{\"Type\":\"9\",\"ColId\":\"1\",\"bytes\":\"8\",\"Name\":\"ts\"},{\"Type\":\"6\",\"ColId\":\"2\",\"bytes\":\"4\",\"Name\":\"current\"},{\"Type\":\"4\",\"ColId\":\"3\",\"bytes\":\"4\",\"Name\":\"voltage\"},{\"Type\":\"6\",\"ColId\":\"4\",\"bytes\":\"4\",\"Name\":\"phase\"},{\"Type\":\"4\",\"ColId\":\"5\",\"bytes\":\"4\",\"Name\":\"groupid\"},{\"Type\":\"8\",\"ColId\":\"6\",\"bytes\":\"26\",\"Name\":\"location\"}]},\"VgroupsInfoSize\":\"1340\",\"VgroupsInfo\":{\"Num\":\"2\",\"Vgroups\":[{\"VgId\":\"2\",\"HashBegin\":\"0\",\"HashEnd\":\"2147483646\",\"EpSet\":{\"InUse\":\"0\",\"NumOfEps\":\"1\",\"Eps\":[{\"Fqdn\":\"localhost\",\"Port\":\"6030\"}]},\"NumOfTable\":\"0\"},{\"VgId\":\"3\",\"HashBegin\":\"2147483647\",\"HashEnd\":\"4294967295\",\"EpSet\":{\"InUse\":\"0\",\"NumOfEps\":\"1\",\"Eps\":[{\"Fqdn\":\"localhost\",\"Port\":\"6030\"}]},\"NumOfTable\":\"0\"}]}}},\"PartitionBy\":[{\"NodeType\":\"1\",\"Name\":\"Column\",\"Column\":{\"DataType\":{\"Type\":\"4\",\"Precision\":\"0\",\"Scale\":\"0\",\"Bytes\":\"4\"},\"AliasName\":\"groupid\",\"UserAlias\":\"groupid\",\"TableId\":\"6555383776122680534\",\"TableType\":\"1\",\"ColId\":\"5\",\"ProjId\":\"0\",\"ColType\":\"2\",\"DbName\":\"test\",\"TableName\":\"meters\",\"TableAlias\":\"meters\",\"ColName\":\"groupid\",\"DataBlockId\":\"0\",\"SlotId\":\"0\"}}],\"Window\":{\"NodeType\":\"14\",\"Name\":\"IntervalWindow\",\"IntervalWindow\":{\"Interval\":{\"NodeType\":\"2\",\"Name\":\"Value\",\"Value\":{\"DataType\":{\"Type\":\"5\",\"Precision\":\"0\",\"Scale\":\"115\",\"Bytes\":\"8\"},\"AliasName\":\"c804c3a15ebe05b5baf40ad5ee12be1f\",\"UserAlias\":\"2s\",\"LiteralSize\":\"2\",\"Literal\":\"2s\",\"Duration\":true,\"Translate\":true,\"NotReserved\":false,\"IsNull\":false,\"Unit\":\"115\",\"Datum\":\"2000\"}},\"TsPk\":{\"NodeType\":\"1\",\"Name\":\"Column\",\"Column\":{\"DataType\":{\"Type\":\"9\",\"Precision\":\"0\",\"Scale\":\"0\",\"Bytes\":\"8\"},\"AliasName\":\"ts\",\"UserAlias\":\"ts\",\"TableId\":\"6555383776122680534\",\"TableType\":\"1\",\"ColId\":\"1\",\"ProjId\":\"0\",\"ColType\":\"1\",\"DbName\":\"test\",\"TableName\":\"meters\",\"TableAlias\":\"meters\",\"ColName\":\"ts\",\"DataBlockId\":\"0\",\"SlotId\":\"0\"}}}},\"StmtName\":\"0x1580095ba\",\"HasAggFuncs\":true}}"; // char* ast = "{\"NodeType\":\"101\",\"Name\":\"SelectStmt\",\"SelectStmt\":{\"Distinct\":false,\"Projections\":[{\"NodeType\":\"5\",\"Name\":\"Function\",\"Function\":{\"DataType\":{\"Type\":\"9\",\"Precision\":\"0\",\"Scale\":\"0\",\"Bytes\":\"8\"},\"AliasName\":\"#expr_1\",\"UserAlias\":\"wstart\",\"Name\":\"_wstart\",\"Id\":\"89\",\"Type\":\"3505\",\"UdfBufSize\":\"0\"}},{\"NodeType\":\"5\",\"Name\":\"Function\",\"Function\":{\"DataType\":{\"Type\":\"2\",\"Precision\":\"0\",\"Scale\":\"0\",\"Bytes\":\"1\"},\"AliasName\":\"#expr_2\",\"UserAlias\":\"min(c1)\",\"Name\":\"min\",\"Id\":\"2\",\"Type\":\"8\",\"Parameters\":[{\"NodeType\":\"1\",\"Name\":\"Column\",\"Column\":{\"DataType\":{\"Type\":\"2\",\"Precision\":\"0\",\"Scale\":\"0\",\"Bytes\":\"1\"},\"AliasName\":\"c1\",\"UserAlias\":\"c1\",\"TableId\":\"5129202035162885657\",\"TableType\":\"1\",\"ColId\":\"2\",\"ProjId\":\"0\",\"ColType\":\"1\",\"DbName\":\"test\",\"TableName\":\"at_once_interval_ext_stb\",\"TableAlias\":\"at_once_interval_ext_stb\",\"ColName\":\"c1\",\"DataBlockId\":\"0\",\"SlotId\":\"0\"}}],\"UdfBufSize\":\"0\"}},{\"NodeType\":\"5\",\"Name\":\"Function\",\"Function\":{\"DataType\":{\"Type\":\"3\",\"Precision\":\"0\",\"Scale\":\"0\",\"Bytes\":\"2\"},\"AliasName\":\"#expr_3\",\"UserAlias\":\"max(c2)\",\"Name\":\"max\",\"Id\":\"3\",\"Type\":\"7\",\"Parameters\":[{\"NodeType\":\"1\",\"Name\":\"Column\",\"Column\":{\"DataType\":{\"Type\":\"3\",\"Precision\":\"0\",\"Scale\":\"0\",\"Bytes\":\"2\"},\"AliasName\":\"c2\",\"UserAlias\":\"c2\",\"TableId\":\"5129202035162885657\",\"TableType\":\"1\",\"ColId\":\"3\",\"ProjId\":\"0\",\"ColType\":\"1\",\"DbName\":\"test\",\"TableName\":\"at_once_interval_ext_stb\",\"TableAlias\":\"at_once_interval_ext_stb\",\"ColName\":\"c2\",\"DataBlockId\":\"0\",\"SlotId\":\"0\"}}],\"UdfBufSize\":\"0\"}},{\"NodeType\":\"5\",\"Name\":\"Function\",\"Function\":{\"DataType\":{\"Type\":\"4\",\"Precision\":\"0\",\"Scale\":\"0\",\"Bytes\":\"4\"},\"AliasName\":\"\",\"UserAlias\":\"\",\"Name\":\"cast\",\"Id\":\"77\",\"Type\":\"2000\",\"Parameters\":[{\"NodeType\":\"5\",\"Name\":\"Function\",\"Function\":{\"DataType\":{\"Type\":\"5\",\"Precision\":\"0\",\"Scale\":\"0\",\"Bytes\":\"8\"},\"AliasName\":\"#expr_4\",\"UserAlias\":\"sum(c3)\",\"Name\":\"sum\",\"Id\":\"1\",\"Type\":\"14\",\"Parameters\":[{\"NodeType\":\"1\",\"Name\":\"Column\",\"Column\":{\"DataType\":{\"Type\":\"4\",\"Precision\":\"0\",\"Scale\":\"0\",\"Bytes\":\"4\"},\"AliasName\":\"c3\",\"UserAlias\":\"c3\",\"TableId\":\"5129202035162885657\",\"TableType\":\"1\",\"ColId\":\"4\",\"ProjId\":\"0\",\"ColType\":\"1\",\"DbName\":\"test\",\"TableName\":\"at_once_interval_ext_stb\",\"TableAlias\":\"at_once_interval_ext_stb\",\"ColName\":\"c3\",\"DataBlockId\":\"0\",\"SlotId\":\"0\"}}],\"UdfBufSize\":\"0\"}},{\"NodeType\":\"2\",\"Name\":\"Value\",\"Value\":{\"DataType\":{\"Type\":\"2\",\"Precision\":\"0\",\"Scale\":\"0\",\"Bytes\":\"1\"},\"AliasName\":\"\",\"UserAlias\":\"\",\"LiteralSize\":\"0\",\"Duration\":false,\"Translate\":true,\"NotReserved\":true,\"IsNull\":false,\"Unit\":\"0\",\"Datum\":\"0\"}}],\"UdfBufSize\":\"0\"}},{\"NodeType\":\"5\",\"Name\":\"Function\",\"Function\":{\"DataType\":{\"Type\":\"5\",\"Precision\":\"0\",\"Scale\":\"0\",\"Bytes\":\"8\"},\"AliasName\":\"#expr_5\",\"UserAlias\":\"first(c4)\",\"Name\":\"first\",\"Id\":\"33\",\"Type\":\"504\",\"Parameters\":[{\"NodeType\":\"1\",\"Name\":\"Column\",\"Column\":{\"DataType\":{\"Type\":\"5\",\"Precision\":\"0\",\"Scale\":\"0\",\"Bytes\":\"8\"},\"AliasName\":\"c4\",\"UserAlias\":\"c4\",\"TableId\":\"5129202035162885657\",\"TableType\":\"1\",\"ColId\":\"5\",\"ProjId\":\"0\",\"ColType\":\"1\",\"DbName\":\"test\",\"TableName\":\"at_once_interval_ext_stb\",\"TableAlias\":\"at_once_interval_ext_stb\",\"ColName\":\"c4\",\"DataBlockId\":\"0\",\"SlotId\":\"0\"}},{\"NodeType\":\"1\",\"Name\":\"Column\",\"Column\":{\"DataType\":{\"Type\":\"9\",\"Precision\":\"0\",\"Scale\":\"0\",\"Bytes\":\"8\"},\"AliasName\":\"ts\",\"UserAlias\":\"ts\",\"TableId\":\"5129202035162885657\",\"TableType\":\"1\",\"ColId\":\"1\",\"ProjId\":\"0\",\"ColType\":\"1\",\"DbName\":\"test\",\"TableName\":\"at_once_interval_ext_stb\",\"TableAlias\":\"at_once_interval_ext_stb\",\"ColName\":\"ts\",\"DataBlockId\":\"0\",\"SlotId\":\"0\"}}],\"UdfBufSize\":\"0\"}},{\"NodeType\":\"5\",\"Name\":\"Function\",\"Function\":{\"DataType\":{\"Type\":\"11\",\"Precision\":\"0\",\"Scale\":\"0\",\"Bytes\":\"1\"},\"AliasName\":\"#expr_6\",\"UserAlias\":\"last(c5)\",\"Name\":\"last\",\"Id\":\"36\",\"Type\":\"506\",\"Parameters\":[{\"NodeType\":\"1\",\"Name\":\"Column\",\"Column\":{\"DataType\":{\"Type\":\"11\",\"Precision\":\"0\",\"Scale\":\"0\",\"Bytes\":\"1\"},\"AliasName\":\"c5\",\"UserAlias\":\"c5\",\"TableId\":\"5129202035162885657\",\"TableType\":\"1\",\"ColId\":\"6\",\"ProjId\":\"0\",\"ColType\":\"1\",\"DbName\":\"test\",\"TableName\":\"at_once_interval_ext_stb\",\"TableAlias\":\"at_once_interval_ext_stb\",\"ColName\":\"c5\",\"DataBlockId\":\"0\",\"SlotId\":\"0\"}},{\"NodeType\":\"1\",\"Name\":\"Column\",\"Column\":{\"DataType\":{\"Type\":\"9\",\"Precision\":\"0\",\"Scale\":\"0\",\"Bytes\":\"8\"},\"AliasName\":\"ts\",\"UserAlias\":\"ts\",\"TableId\":\"5129202035162885657\",\"TableType\":\"1\",\"ColId\":\"1\",\"ProjId\":\"0\",\"ColType\":\"1\",\"DbName\":\"test\",\"TableName\":\"at_once_interval_ext_stb\",\"TableAlias\":\"at_once_interval_ext_stb\",\"ColName\":\"ts\",\"DataBlockId\":\"0\",\"SlotId\":\"0\"}}],\"UdfBufSize\":\"0\"}},{\"NodeType\":\"5\",\"Name\":\"Function\",\"Function\":{\"DataType\":{\"Type\":\"12\",\"Precision\":\"0\",\"Scale\":\"0\",\"Bytes\":\"2\"},\"AliasName\":\"\",\"UserAlias\":\"\",\"Name\":\"cast\",\"Id\":\"77\",\"Type\":\"2000\",\"Parameters\":[{\"NodeType\":\"5\",\"Name\":\"Function\",\"Function\":{\"DataType\":{\"Type\":\"7\",\"Precision\":\"0\",\"Scale\":\"0\",\"Bytes\":\"8\"},\"AliasName\":\"#expr_7\",\"UserAlias\":\"apercentile(c6, 50)\",\"Name\":\"apercentile\",\"Id\":\"12\",\"Type\":\"1\",\"Parameters\":[{\"NodeType\":\"1\",\"Name\":\"Column\",\"Column\":{\"DataType\":{\"Type\":\"12\",\"Precision\":\"0\",\"Scale\":\"0\",\"Bytes\":\"2\"},\"AliasName\":\"c6\",\"UserAlias\":\"c6\",\"TableId\":\"5129202035162885657\",\"TableType\":\"1\",\"ColId\":\"7\",\"ProjId\":\"0\",\"ColType\":\"1\",\"DbName\":\"test\",\"TableName\":\"at_once_interval_ext_stb\",\"TableAlias\":\"at_once_interval_ext_stb\",\"ColName\":\"c6\",\"DataBlockId\":\"0\",\"SlotId\":\"0\"}},{\"NodeType\":\"2\",\"Name\":\"Value\",\"Value\":{\"DataType\":{\"Type\":\"5\",\"Precision\":\"0\",\"Scale\":\"0\",\"Bytes\":\"8\"},\"AliasName\":\"c0c7c76d30bd3dcaefc96f40275bdc0a\",\"UserAlias\":\"50\",\"LiteralSize\":\"2\",\"Literal\":\"50\",\"Duration\":false,\"Translate\":true,\"NotReserved\":true,\"IsNull\":false,\"Unit\":\"0\",\"Datum\":\"50\"}}],\"UdfBufSize\":\"0\"}},{\"NodeType\":\"2\",\"Name\":\"Value\",\"Value\":{\"DataType\":{\"Type\":\"2\",\"Precision\":\"0\",\"Scale\":\"0\",\"Bytes\":\"1\"},\"AliasName\":\"\",\"UserAlias\":\"\",\"LiteralSize\":\"0\",\"Duration\":false,\"Translate\":true,\"NotReserved\":true,\"IsNull\":false,\"Unit\":\"0\",\"Datum\":\"0\"}}],\"UdfBufSize\":\"0\"}},{\"NodeType\":\"5\",\"Name\":\"Function\",\"Function\":{\"DataType\":{\"Type\":\"13\",\"Precision\":\"0\",\"Scale\":\"0\",\"Bytes\":\"4\"},\"AliasName\":\"\",\"UserAlias\":\"\",\"Name\":\"cast\",\"Id\":\"77\",\"Type\":\"2000\",\"Parameters\":[{\"NodeType\":\"5\",\"Name\":\"Function\",\"Function\":{\"DataType\":{\"Type\":\"7\",\"Precision\":\"0\",\"Scale\":\"0\",\"Bytes\":\"8\"},\"AliasName\":\"#expr_8\",\"UserAlias\":\"avg(c7)\",\"Name\":\"avg\",\"Id\":\"8\",\"Type\":\"2\",\"Parameters\":[{\"NodeType\":\"1\",\"Name\":\"Column\",\"Column\":{\"DataType\":{\"Type\":\"13\",\"Precision\":\"0\",\"Scale\":\"0\",\"Bytes\":\"4\"},\"AliasName\":\"c7\",\"UserAlias\":\"c7\",\"TableId\":\"5129202035162885657\",\"TableType\":\"1\",\"ColId\":\"8\",\"ProjId\":\"0\",\"ColType\":\"1\",\"DbName\":\"test\",\"TableName\":\"at_once_interval_ext_stb\",\"TableAlias\":\"at_once_interval_ext_stb\",\"ColName\":\"c7\",\"DataBlockId\":\"0\",\"SlotId\":\"0\"}}],\"UdfBufSize\":\"0\"}},{\"NodeType\":\"2\",\"Name\":\"Value\",\"Value\":{\"DataType\":{\"Type\":\"2\",\"Precision\":\"0\",\"Scale\":\"0\",\"Bytes\":\"1\"},\"AliasName\":\"\",\"UserAlias\":\"\",\"LiteralSize\":\"0\",\"Duration\":false,\"Translate\":true,\"NotReserved\":true,\"IsNull\":false,\"Unit\":\"0\",\"Datum\":\"0\"}}],\"UdfBufSize\":\"0\"}},{\"NodeType\":\"5\",\"Name\":\"Function\",\"Function\":{\"DataType\":{\"Type\":\"14\",\"Precision\":\"0\",\"Scale\":\"0\",\"Bytes\":\"8\"},\"AliasName\":\"\",\"UserAlias\":\"\",\"Name\":\"cast\",\"Id\":\"77\",\"Type\":\"2000\",\"Parameters\":[{\"NodeType\":\"5\",\"Name\":\"Function\",\"Function\":{\"DataType\":{\"Type\":\"5\",\"Precision\":\"0\",\"Scale\":\"0\",\"Bytes\":\"8\"},\"AliasName\":\"#expr_9\",\"UserAlias\":\"count(c8)\",\"Name\":\"count\",\"Id\":\"0\",\"Type\":\"3\",\"Parameters\":[{\"NodeType\":\"1\",\"Name\":\"Column\",\"Column\":{\"DataType\":{\"Type\":\"14\",\"Precision\":\"0\",\"Scale\":\"0\",\"Bytes\":\"8\"},\"AliasName\":\"c8\",\"UserAlias\":\"c8\",\"TableId\":\"5129202035162885657\",\"TableType\":\"1\",\"ColId\":\"9\",\"ProjId\":\"0\",\"ColType\":\"1\",\"DbName\":\"test\",\"TableName\":\"at_once_interval_ext_stb\",\"TableAlias\":\"at_once_interval_ext_stb\",\"ColName\":\"c8\",\"DataBlockId\":\"0\",\"SlotId\":\"0\"}}],\"UdfBufSize\":\"0\"}},{\"NodeType\":\"2\",\"Name\":\"Value\",\"Value\":{\"DataType\":{\"Type\":\"2\",\"Precision\":\"0\",\"Scale\":\"0\",\"Bytes\":\"1\"},\"AliasName\":\"\",\"UserAlias\":\"\",\"LiteralSize\":\"0\",\"Duration\":false,\"Translate\":true,\"NotReserved\":true,\"IsNull\":false,\"Unit\":\"0\",\"Datum\":\"0\"}}],\"UdfBufSize\":\"0\"}},{\"NodeType\":\"5\",\"Name\":\"Function\",\"Function\":{\"DataType\":{\"Type\":\"6\",\"Precision\":\"0\",\"Scale\":\"0\",\"Bytes\":\"4\"},\"AliasName\":\"\",\"UserAlias\":\"\",\"Name\":\"cast\",\"Id\":\"77\",\"Type\":\"2000\",\"Parameters\":[{\"NodeType\":\"5\",\"Name\":\"Function\",\"Function\":{\"DataType\":{\"Type\":\"7\",\"Precision\":\"0\",\"Scale\":\"0\",\"Bytes\":\"8\"},\"AliasName\":\"#expr_10\",\"UserAlias\":\"spread(c1)\",\"Name\":\"spread\",\"Id\":\"17\",\"Type\":\"11\",\"Parameters\":[{\"NodeType\":\"1\",\"Name\":\"Column\",\"Column\":{\"DataType\":{\"Type\":\"2\",\"Precision\":\"0\",\"Scale\":\"0\",\"Bytes\":\"1\"},\"AliasName\":\"c1\",\"UserAlias\":\"c1\",\"TableId\":\"5129202035162885657\",\"TableType\":\"1\",\"ColId\":\"2\",\"ProjId\":\"0\",\"ColType\":\"1\",\"DbName\":\"test\",\"TableName\":\"at_once_interval_ext_stb\",\"TableAlias\":\"at_once_interval_ext_stb\",\"ColName\":\"c1\",\"DataBlockId\":\"0\",\"SlotId\":\"0\"}}],\"UdfBufSize\":\"0\"}},{\"NodeType\":\"2\",\"Name\":\"Value\",\"Value\":{\"DataType\":{\"Type\":\"2\",\"Precision\":\"0\",\"Scale\":\"0\",\"Bytes\":\"1\"},\"AliasName\":\"\",\"UserAlias\":\"\",\"LiteralSize\":\"0\",\"Duration\":false,\"Translate\":true,\"NotReserved\":true,\"IsNull\":false,\"Unit\":\"0\",\"Datum\":\"0\"}}],\"UdfBufSize\":\"0\"}},{\"NodeType\":\"5\",\"Name\":\"Function\",\"Function\":{\"DataType\":{\"Type\":\"7\",\"Precision\":\"0\",\"Scale\":\"0\",\"Bytes\":\"8\"},\"AliasName\":\"#expr_11\",\"UserAlias\":\"stddev(c2)\",\"Name\":\"stddev\",\"Id\":\"4\",\"Type\":\"12\",\"Parameters\":[{\"NodeType\":\"1\",\"Name\":\"Column\",\"Column\":{\"DataType\":{\"Type\":\"3\",\"Precision\":\"0\",\"Scale\":\"0\",\"Bytes\":\"2\"},\"AliasName\":\"c2\",\"UserAlias\":\"c2\",\"TableId\":\"5129202035162885657\",\"TableType\":\"1\",\"ColId\":\"3\",\"ProjId\":\"0\",\"ColType\":\"1\",\"DbName\":\"test\",\"TableName\":\"at_once_interval_ext_stb\",\"TableAlias\":\"at_once_interval_ext_stb\",\"ColName\":\"c2\",\"DataBlockId\":\"0\",\"SlotId\":\"0\"}}],\"UdfBufSize\":\"0\"}},{\"NodeType\":\"5\",\"Name\":\"Function\",\"Function\":{\"DataType\":{\"Type\":\"8\",\"Precision\":\"0\",\"Scale\":\"0\",\"Bytes\":\"8\"},\"AliasName\":\"\",\"UserAlias\":\"\",\"Name\":\"cast\",\"Id\":\"77\",\"Type\":\"2000\",\"Parameters\":[{\"NodeType\":\"5\",\"Name\":\"Function\",\"Function\":{\"DataType\":{\"Type\":\"5\",\"Precision\":\"0\",\"Scale\":\"0\",\"Bytes\":\"8\"},\"AliasName\":\"#expr_12\",\"UserAlias\":\"hyperloglog(c11)\",\"Name\":\"hyperloglog\",\"Id\":\"43\",\"Type\":\"17\",\"Parameters\":[{\"NodeType\":\"1\",\"Name\":\"Column\",\"Column\":{\"DataType\":{\"Type\":\"8\",\"Precision\":\"0\",\"Scale\":\"0\",\"Bytes\":\"8\"},\"AliasName\":\"c11\",\"UserAlias\":\"c11\",\"TableId\":\"5129202035162885657\",\"TableType\":\"1\",\"ColId\":\"12\",\"ProjId\":\"0\",\"ColType\":\"1\",\"DbName\":\"test\",\"TableName\":\"at_once_interval_ext_stb\",\"TableAlias\":\"at_once_interval_ext_stb\",\"ColName\":\"c11\",\"DataBlockId\":\"0\",\"SlotId\":\"0\"}}],\"UdfBufSize\":\"0\"}},{\"NodeType\":\"2\",\"Name\":\"Value\",\"Value\":{\"DataType\":{\"Type\":\"2\",\"Precision\":\"0\",\"Scale\":\"0\",\"Bytes\":\"1\"},\"AliasName\":\"\",\"UserAlias\":\"\",\"LiteralSize\":\"0\",\"Duration\":false,\"Translate\":true,\"NotReserved\":true,\"IsNull\":false,\"Unit\":\"0\",\"Datum\":\"0\"}}],\"UdfBufSize\":\"0\"}},{\"NodeType\":\"5\",\"Name\":\"Function\",\"Function\":{\"DataType\":{\"Type\":\"10\",\"Precision\":\"0\",\"Scale\":\"0\",\"Bytes\":\"26\"},\"AliasName\":\"\",\"UserAlias\":\"\",\"Name\":\"cast\",\"Id\":\"77\",\"Type\":\"2000\",\"Parameters\":[{\"NodeType\":\"5\",\"Name\":\"Function\",\"Function\":{\"DataType\":{\"Type\":\"5\",\"Precision\":\"0\",\"Scale\":\"0\",\"Bytes\":\"8\"},\"AliasName\":\"#expr_13\",\"UserAlias\":\"timediff(1, 0, 1h)\",\"Name\":\"timediff\",\"Id\":\"81\",\"Type\":\"2501\",\"Parameters\":[{\"NodeType\":\"2\",\"Name\":\"Value\",\"Value\":{\"DataType\":{\"Type\":\"5\",\"Precision\":\"0\",\"Scale\":\"0\",\"Bytes\":\"8\"},\"AliasName\":\"c4ca4238a0b923820dcc509a6f75849b\",\"UserAlias\":\"1\",\"LiteralSize\":\"1\",\"Literal\":\"1\",\"Duration\":false,\"Translate\":true,\"NotReserved\":false,\"IsNull\":false,\"Unit\":\"0\",\"Datum\":\"1\"}},{\"NodeType\":\"2\",\"Name\":\"Value\",\"Value\":{\"DataType\":{\"Type\":\"5\",\"Precision\":\"0\",\"Scale\":\"0\",\"Bytes\":\"8\"},\"AliasName\":\"cfcd208495d565ef66e7dff9f98764da\",\"UserAlias\":\"0\",\"LiteralSize\":\"1\",\"Literal\":\"0\",\"Duration\":false,\"Translate\":true,\"NotReserved\":false,\"IsNull\":false,\"Unit\":\"0\",\"Datum\":\"0\"}},{\"NodeType\":\"2\",\"Name\":\"Value\",\"Value\":{\"DataType\":{\"Type\":\"5\",\"Precision\":\"0\",\"Scale\":\"104\",\"Bytes\":\"8\"},\"AliasName\":\"7c68645d71b803bf0ba2f22519f73e08\",\"UserAlias\":\"1h\",\"LiteralSize\":\"2\",\"Literal\":\"1h\",\"Duration\":true,\"Translate\":true,\"NotReserved\":false,\"IsNull\":false,\"Unit\":\"104\",\"Datum\":\"3600000\"}},{\"NodeType\":\"2\",\"Name\":\"Value\",\"Value\":{\"DataType\":{\"Type\":\"2\",\"Precision\":\"0\",\"Scale\":\"0\",\"Bytes\":\"1\"},\"AliasName\":\"\",\"UserAlias\":\"\",\"LiteralSize\":\"0\",\"Duration\":false,\"Translate\":true,\"NotReserved\":true,\"IsNull\":false,\"Unit\":\"0\",\"Datum\":\"0\"}}],\"UdfBufSize\":\"0\"}},{\"NodeType\":\"2\",\"Name\":\"Value\",\"Value\":{\"DataType\":{\"Type\":\"2\",\"Precision\":\"0\",\"Scale\":\"0\",\"Bytes\":\"1\"},\"AliasName\":\"\",\"UserAlias\":\"\",\"LiteralSize\":\"0\",\"Duration\":false,\"Translate\":true,\"NotReserved\":true,\"IsNull\":false,\"Unit\":\"0\",\"Datum\":\"0\"}}],\"UdfBufSize\":\"0\"}},{\"NodeType\":\"5\",\"Name\":\"Function\",\"Function\":{\"DataType\":{\"Type\":\"1\",\"Precision\":\"0\",\"Scale\":\"0\",\"Bytes\":\"1\"},\"AliasName\":\"\",\"UserAlias\":\"\",\"Name\":\"cast\",\"Id\":\"77\",\"Type\":\"2000\",\"Parameters\":[{\"NodeType\":\"5\",\"Name\":\"Function\",\"Function\":{\"DataType\":{\"Type\":\"8\",\"Precision\":\"0\",\"Scale\":\"0\",\"Bytes\":\"96\"},\"AliasName\":\"#expr_14\",\"UserAlias\":\"timezone()\",\"Name\":\"timezone\",\"Id\":\"84\",\"Type\":\"2503\",\"UdfBufSize\":\"0\"}},{\"NodeType\":\"2\",\"Name\":\"Value\",\"Value\":{\"DataType\":{\"Type\":\"2\",\"Precision\":\"0\",\"Scale\":\"0\",\"Bytes\":\"1\"},\"AliasName\":\"\",\"UserAlias\":\"\",\"LiteralSize\":\"0\",\"Duration\":false,\"Translate\":true,\"NotReserved\":true,\"IsNull\":false,\"Unit\":\"0\",\"Datum\":\"0\"}}],\"UdfBufSize\":\"0\"}}],\"From\":{\"NodeType\":\"6\",\"Name\":\"RealTable\",\"RealTable\":{\"DataType\":{\"Type\":\"0\",\"Precision\":\"0\",\"Scale\":\"0\",\"Bytes\":\"0\"},\"AliasName\":\"\",\"UserAlias\":\"\",\"DbName\":\"test\",\"tableName\":\"at_once_interval_ext_stb\",\"tableAlias\":\"at_once_interval_ext_stb\",\"MetaSize\":\"2008\",\"Meta\":{\"VgId\":\"0\",\"TableType\":\"1\",\"Uid\":\"5129202035162885657\",\"Suid\":\"5129202035162885657\",\"Sversion\":\"1\",\"Tversion\":\"1\",\"ComInfo\":{\"NumOfTags\":\"13\",\"Precision\":\"0\",\"NumOfColumns\":\"14\",\"RowSize\":\"85\"},\"ColSchemas\":[{\"Type\":\"9\",\"ColId\":\"1\",\"bytes\":\"8\",\"Name\":\"ts\"},{\"Type\":\"2\",\"ColId\":\"2\",\"bytes\":\"1\",\"Name\":\"c1\"},{\"Type\":\"3\",\"ColId\":\"3\",\"bytes\":\"2\",\"Name\":\"c2\"},{\"Type\":\"4\",\"ColId\":\"4\",\"bytes\":\"4\",\"Name\":\"c3\"},{\"Type\":\"5\",\"ColId\":\"5\",\"bytes\":\"8\",\"Name\":\"c4\"},{\"Type\":\"11\",\"ColId\":\"6\",\"bytes\":\"1\",\"Name\":\"c5\"},{\"Type\":\"12\",\"ColId\":\"7\",\"bytes\":\"2\",\"Name\":\"c6\"},{\"Type\":\"13\",\"ColId\":\"8\",\"bytes\":\"4\",\"Name\":\"c7\"},{\"Type\":\"14\",\"ColId\":\"9\",\"bytes\":\"8\",\"Name\":\"c8\"},{\"Type\":\"6\",\"ColId\":\"10\",\"bytes\":\"4\",\"Name\":\"c9\"},{\"Type\":\"7\",\"ColId\":\"11\",\"bytes\":\"8\",\"Name\":\"c10\"},{\"Type\":\"8\",\"ColId\":\"12\",\"bytes\":\"8\",\"Name\":\"c11\"},{\"Type\":\"10\",\"ColId\":\"13\",\"bytes\":\"26\",\"Name\":\"c12\"},{\"Type\":\"1\",\"ColId\":\"14\",\"bytes\":\"1\",\"Name\":\"c13\"},{\"Type\":\"2\",\"ColId\":\"15\",\"bytes\":\"1\",\"Name\":\"t1\"},{\"Type\":\"3\",\"ColId\":\"16\",\"bytes\":\"2\",\"Name\":\"t2\"},{\"Type\":\"4\",\"ColId\":\"17\",\"bytes\":\"4\",\"Name\":\"t3\"},{\"Type\":\"5\",\"ColId\":\"18\",\"bytes\":\"8\",\"Name\":\"t4\"},{\"Type\":\"11\",\"ColId\":\"19\",\"bytes\":\"1\",\"Name\":\"t5\"},{\"Type\":\"12\",\"ColId\":\"20\",\"bytes\":\"2\",\"Name\":\"t6\"},{\"Type\":\"13\",\"ColId\":\"21\",\"bytes\":\"4\",\"Name\":\"t7\"},{\"Type\":\"14\",\"ColId\":\"22\",\"bytes\":\"8\",\"Name\":\"t8\"},{\"Type\":\"6\",\"ColId\":\"23\",\"bytes\":\"4\",\"Name\":\"t9\"},{\"Type\":\"7\",\"ColId\":\"24\",\"bytes\":\"8\",\"Name\":\"t10\"},{\"Type\":\"8\",\"ColId\":\"25\",\"bytes\":\"8\",\"Name\":\"t11\"},{\"Type\":\"10\",\"ColId\":\"26\",\"bytes\":\"26\",\"Name\":\"t12\"},{\"Type\":\"1\",\"ColId\":\"27\",\"bytes\":\"1\",\"Name\":\"t13\"}]},\"VgroupsInfoSize\":\"1340\",\"VgroupsInfo\":{\"Num\":\"2\",\"Vgroups\":[{\"VgId\":\"14\",\"HashBegin\":\"0\",\"HashEnd\":\"2147483646\",\"EpSet\":{\"InUse\":\"0\",\"NumOfEps\":\"1\",\"Eps\":[{\"Fqdn\":\"localhost\",\"Port\":\"6030\"}]},\"NumOfTable\":\"0\"},{\"VgId\":\"15\",\"HashBegin\":\"2147483647\",\"HashEnd\":\"4294967295\",\"EpSet\":{\"InUse\":\"0\",\"NumOfEps\":\"1\",\"Eps\":[{\"Fqdn\":\"localhost\",\"Port\":\"6030\"}]},\"NumOfTable\":\"0\"}]}}},\"Tags\":[{\"NodeType\":\"2\",\"Name\":\"Value\",\"Value\":{\"DataType\":{\"Type\":\"0\",\"Precision\":\"0\",\"Scale\":\"0\",\"Bytes\":\"1\"},\"AliasName\":\"\",\"UserAlias\":\"\",\"LiteralSize\":\"0\",\"Duration\":false,\"Translate\":false,\"NotReserved\":false,\"IsNull\":true,\"Unit\":\"0\"}},{\"NodeType\":\"2\",\"Name\":\"Value\",\"Value\":{\"DataType\":{\"Type\":\"0\",\"Precision\":\"0\",\"Scale\":\"0\",\"Bytes\":\"1\"},\"AliasName\":\"\",\"UserAlias\":\"\",\"LiteralSize\":\"0\",\"Duration\":false,\"Translate\":false,\"NotReserved\":false,\"IsNull\":true,\"Unit\":\"0\"}},{\"NodeType\":\"2\",\"Name\":\"Value\",\"Value\":{\"DataType\":{\"Type\":\"0\",\"Precision\":\"0\",\"Scale\":\"0\",\"Bytes\":\"1\"},\"AliasName\":\"\",\"UserAlias\":\"\",\"LiteralSize\":\"0\",\"Duration\":false,\"Translate\":false,\"NotReserved\":false,\"IsNull\":true,\"Unit\":\"0\"}},{\"NodeType\":\"2\",\"Name\":\"Value\",\"Value\":{\"DataType\":{\"Type\":\"0\",\"Precision\":\"0\",\"Scale\":\"0\",\"Bytes\":\"1\"},\"AliasName\":\"\",\"UserAlias\":\"\",\"LiteralSize\":\"0\",\"Duration\":false,\"Translate\":false,\"NotReserved\":false,\"IsNull\":true,\"Unit\":\"0\"}},{\"NodeType\":\"2\",\"Name\":\"Value\",\"Value\":{\"DataType\":{\"Type\":\"0\",\"Precision\":\"0\",\"Scale\":\"0\",\"Bytes\":\"1\"},\"AliasName\":\"\",\"UserAlias\":\"\",\"LiteralSize\":\"0\",\"Duration\":false,\"Translate\":false,\"NotReserved\":false,\"IsNull\":true,\"Unit\":\"0\"}},{\"NodeType\":\"2\",\"Name\":\"Value\",\"Value\":{\"DataType\":{\"Type\":\"0\",\"Precision\":\"0\",\"Scale\":\"0\",\"Bytes\":\"1\"},\"AliasName\":\"\",\"UserAlias\":\"\",\"LiteralSize\":\"0\",\"Duration\":false,\"Translate\":false,\"NotReserved\":false,\"IsNull\":true,\"Unit\":\"0\"}},{\"NodeType\":\"2\",\"Name\":\"Value\",\"Value\":{\"DataType\":{\"Type\":\"0\",\"Precision\":\"0\",\"Scale\":\"0\",\"Bytes\":\"1\"},\"AliasName\":\"\",\"UserAlias\":\"\",\"LiteralSize\":\"0\",\"Duration\":false,\"Translate\":false,\"NotReserved\":false,\"IsNull\":true,\"Unit\":\"0\"}},{\"NodeType\":\"2\",\"Name\":\"Value\",\"Value\":{\"DataType\":{\"Type\":\"0\",\"Precision\":\"0\",\"Scale\":\"0\",\"Bytes\":\"1\"},\"AliasName\":\"\",\"UserAlias\":\"\",\"LiteralSize\":\"0\",\"Duration\":false,\"Translate\":false,\"NotReserved\":false,\"IsNull\":true,\"Unit\":\"0\"}},{\"NodeType\":\"2\",\"Name\":\"Value\",\"Value\":{\"DataType\":{\"Type\":\"0\",\"Precision\":\"0\",\"Scale\":\"0\",\"Bytes\":\"1\"},\"AliasName\":\"\",\"UserAlias\":\"\",\"LiteralSize\":\"0\",\"Duration\":false,\"Translate\":false,\"NotReserved\":false,\"IsNull\":true,\"Unit\":\"0\"}},{\"NodeType\":\"2\",\"Name\":\"Value\",\"Value\":{\"DataType\":{\"Type\":\"0\",\"Precision\":\"0\",\"Scale\":\"0\",\"Bytes\":\"1\"},\"AliasName\":\"\",\"UserAlias\":\"\",\"LiteralSize\":\"0\",\"Duration\":false,\"Translate\":false,\"NotReserved\":false,\"IsNull\":true,\"Unit\":\"0\"}},{\"NodeType\":\"2\",\"Name\":\"Value\",\"Value\":{\"DataType\":{\"Type\":\"0\",\"Precision\":\"0\",\"Scale\":\"0\",\"Bytes\":\"1\"},\"AliasName\":\"\",\"UserAlias\":\"\",\"LiteralSize\":\"0\",\"Duration\":false,\"Translate\":false,\"NotReserved\":false,\"IsNull\":true,\"Unit\":\"0\"}},{\"NodeType\":\"2\",\"Name\":\"Value\",\"Value\":{\"DataType\":{\"Type\":\"0\",\"Precision\":\"0\",\"Scale\":\"0\",\"Bytes\":\"1\"},\"AliasName\":\"\",\"UserAlias\":\"\",\"LiteralSize\":\"0\",\"Duration\":false,\"Translate\":false,\"NotReserved\":false,\"IsNull\":true,\"Unit\":\"0\"}},{\"NodeType\":\"2\",\"Name\":\"Value\",\"Value\":{\"DataType\":{\"Type\":\"0\",\"Precision\":\"0\",\"Scale\":\"0\",\"Bytes\":\"1\"},\"AliasName\":\"\",\"UserAlias\":\"\",\"LiteralSize\":\"0\",\"Duration\":false,\"Translate\":false,\"NotReserved\":false,\"IsNull\":true,\"Unit\":\"0\"}}],\"Window\":{\"NodeType\":\"14\",\"Name\":\"IntervalWindow\",\"IntervalWindow\":{\"Interval\":{\"NodeType\":\"2\",\"Name\":\"Value\",\"Value\":{\"DataType\":{\"Type\":\"5\",\"Precision\":\"0\",\"Scale\":\"115\",\"Bytes\":\"8\"},\"AliasName\":\"1fd7635317edfeca9054894ac9ef9b5e\",\"UserAlias\":\"14s\",\"LiteralSize\":\"3\",\"Literal\":\"14s\",\"Duration\":true,\"Translate\":true,\"NotReserved\":false,\"IsNull\":false,\"Unit\":\"115\",\"Datum\":\"14000\"}},\"TsPk\":{\"NodeType\":\"1\",\"Name\":\"Column\",\"Column\":{\"DataType\":{\"Type\":\"9\",\"Precision\":\"0\",\"Scale\":\"0\",\"Bytes\":\"8\"},\"AliasName\":\"ts\",\"UserAlias\":\"ts\",\"TableId\":\"5129202035162885657\",\"TableType\":\"1\",\"ColId\":\"1\",\"ProjId\":\"0\",\"ColType\":\"1\",\"DbName\":\"test\",\"TableName\":\"at_once_interval_ext_stb\",\"TableAlias\":\"at_once_interval_ext_stb\",\"ColName\":\"ts\",\"DataBlockId\":\"0\",\"SlotId\":\"0\"}}}},\"StmtName\":\"0x150146d14\",\"HasAggFuncs\":true}}"; SNode * pAst = NULL; SQueryPlan *pPlan = NULL; if (taosCreateLog("taoslog", 10, "/etc/taos", NULL, NULL, NULL, NULL, 1) != 0) { - // ignore create log failed, only print - printf(" WARING: Create failed:%s. configDir\n", strerror(errno)); + // ignore create log failed, only print + printf(" WARING: Create failed:%s. configDir\n", strerror(errno)); } if (nodesStringToNode(ast, &pAst) < 0) { - ASSERT(0); + ASSERT(0); } - SPlanContext cxt = { 0 }; + SPlanContext cxt = {0}; cxt.pAstRoot = pAst; cxt.topicQuery = false; cxt.streamQuery = true; @@ -181,10 +259,11 @@ TEST(testCase, plan_Test) { // using ast and param to build physical plan if (qCreateQueryPlan(&cxt, &pPlan, NULL) < 0) { - ASSERT(0); + ASSERT(0); } + if (pAst != NULL) nodesDestroyNode(pAst); - nodesDestroyNode((SNode *)pPlan); + nodesDestroyNode((SNode*)pPlan); } #pragma GCC diagnostic pop \ No newline at end of file diff --git a/source/dnode/vnode/src/sma/smaTimeRange.c b/source/dnode/vnode/src/sma/smaTimeRange.c index 54d859992c..fb898c02f8 100644 --- a/source/dnode/vnode/src/sma/smaTimeRange.c +++ b/source/dnode/vnode/src/sma/smaTimeRange.c @@ -193,7 +193,7 @@ int32_t smaBlockToSubmit(SVnode *pVnode, const SArray *pBlocks, const STSchema * continue; } - SSubmitTbData tbData = {.suid = suid, .uid = 0, .sver = pTSchema->version, .flags = SUBMIT_REQ_AUTO_CREATE_TABLE, .source = SOURCE_NULL}; + SSubmitTbData tbData = {.suid = suid, .uid = 0, .sver = pTSchema->version, .flags = SUBMIT_REQ_AUTO_CREATE_TABLE}; int32_t cid = taosArrayGetSize(pDataBlock->pDataBlock) + 1; tbData.pCreateTbReq = buildAutoCreateTableReq(stbFullName, suid, cid, pDataBlock, tagArray, true); diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 458a8584e3..d1074daf81 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -392,7 +392,7 @@ bool tqNextBlockInWal(STqReader* pReader, const char* id, int sourceExcluded) { pReader->msg.ver); SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk); - if ((pSubmitTbData->source & sourceExcluded) != 0) { + if ((pSubmitTbData->flags & sourceExcluded) != 0) { pReader->nextBlk += 1; continue; } diff --git a/source/dnode/vnode/src/tq/tqScan.c b/source/dnode/vnode/src/tq/tqScan.c index f108cedcf5..103007eb57 100644 --- a/source/dnode/vnode/src/tq/tqScan.c +++ b/source/dnode/vnode/src/tq/tqScan.c @@ -267,7 +267,7 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) goto loop_table; } - if ((pSubmitTbDataRet->source & sourceExcluded) != 0) { + if ((pSubmitTbDataRet->flags & sourceExcluded) != 0) { goto loop_table; } if (pRsp->withTbName) { @@ -335,7 +335,7 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) goto loop_db; } - if ((pSubmitTbDataRet->source & sourceExcluded) != 0) { + if ((pSubmitTbDataRet->flags & sourceExcluded) != 0) { goto loop_db; } if (pRsp->withTbName) { diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index 844351a44f..5411ade010 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -816,7 +816,7 @@ void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) { return; } - SSubmitTbData tbData = {.suid = suid, .uid = 0, .sver = pTSchema->version, .source = SOURCE_NULL}; + SSubmitTbData tbData = {.suid = suid, .uid = 0, .sver = pTSchema->version, .flags = TD_REQ_FROM_APP}; code = setDstTableDataUid(pVnode, pTask, pDataBlock, stbFullName, &tbData); if (code != TSDB_CODE_SUCCESS) { continue; @@ -861,7 +861,7 @@ void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) { pTask->execInfo.sink.numOfBlocks += 1; uint64_t groupId = pDataBlock->info.id.groupId; - SSubmitTbData tbData = {.suid = suid, .uid = 0, .sver = pTSchema->version, .source = SOURCE_NULL}; + SSubmitTbData tbData = {.suid = suid, .uid = 0, .sver = pTSchema->version, .flags = TD_REQ_FROM_APP}; int32_t* index = taosHashGet(pTableIndexMap, &groupId, sizeof(groupId)); if (index == NULL) { // no data yet, append it diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index 31267dbf52..dad1211326 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -171,6 +171,22 @@ end : { } } +#define PROCESS_EXCLUDED_MSG(TYPE, DECODE_FUNC) \ + SDecoder decoder = {0};\ + TYPE req = {0}; \ + void* data = POINTER_SHIFT(pHead->body, sizeof(SMsgHead)); \ + int32_t len = pHead->bodyLen - sizeof(SMsgHead); \ + tDecoderInit(&decoder, data, len); \ + if (DECODE_FUNC(&decoder, &req) == 0 && (req.source & TD_REQ_FROM_TAOX) != 0) { \ + tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d) iter log, jump meta for, vgId:%d offset %" PRId64 " msgType %d", \ + pRequest->consumerId, pRequest->epoch, vgId, fetchVer, pHead->msgType); \ + fetchVer++; \ + tDecoderClear(&decoder); \ + continue; \ + } \ + tDecoderClear(&decoder); + + static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg, STqOffsetVal* offset) { int code = 0; @@ -239,6 +255,19 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, goto end; } + if ((pRequest->sourceExcluded & TD_REQ_FROM_TAOX) != 0) { + if (pHead->msgType == TDMT_VND_CREATE_TABLE) { + PROCESS_EXCLUDED_MSG(SVCreateTbBatchReq, tDecodeSVCreateTbBatchReq) + } else if (pHead->msgType == TDMT_VND_ALTER_TABLE) { + PROCESS_EXCLUDED_MSG(SVAlterTbReq, tDecodeSVAlterTbReq) + } else if (pHead->msgType == TDMT_VND_CREATE_STB || pHead->msgType == TDMT_VND_ALTER_STB) { + PROCESS_EXCLUDED_MSG(SVCreateStbReq, tDecodeSVCreateStbReq) + } else if (pHead->msgType == TDMT_VND_DELETE) { + fetchVer++; + continue; + } + } + tqDebug("fetch meta msg, ver:%" PRId64 ", type:%s", pHead->version, TMSG_INFO(pHead->msgType)); tqOffsetResetToLog(&metaRsp.rspOffset, fetchVer + 1); metaRsp.resMsgType = pHead->msgType; diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index 8e1a6bd319..69da849d37 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -2555,7 +2555,7 @@ static int32_t moveToNextFile(STsdbReader* pReader, SBlockNumber* pBlockNum, SAr while (1) { // only check here, since the iterate data in memory is very fast. if (pReader->code != TSDB_CODE_SUCCESS) { - tsdbWarn("tsdb reader is stopped ASAP, code:%s, %s", strerror(pReader->code), pReader->idStr); + tsdbWarn("tsdb reader is stopped ASAP, code:%s, %s", tstrerror(pReader->code), pReader->idStr); return pReader->code; } @@ -2707,7 +2707,7 @@ static int32_t doLoadSttBlockSequentially(STsdbReader* pReader) { while (1) { if (pReader->code != TSDB_CODE_SUCCESS) { - tsdbWarn("tsdb reader is stopped ASAP, code:%s, %s", strerror(pReader->code), pReader->idStr); + tsdbWarn("tsdb reader is stopped ASAP, code:%s, %s", tstrerror(pReader->code), pReader->idStr); return pReader->code; } @@ -2922,7 +2922,7 @@ static int32_t buildBlockFromBufferSeqForPreFileset(STsdbReader* pReader, int64_ while (1) { if (pReader->code != TSDB_CODE_SUCCESS) { - tsdbWarn("tsdb reader is stopped ASAP, code:%s, %s", strerror(pReader->code), pReader->idStr); + tsdbWarn("tsdb reader is stopped ASAP, code:%s, %s", tstrerror(pReader->code), pReader->idStr); return pReader->code; } @@ -2964,7 +2964,7 @@ static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader, int64_t en while (1) { if (pReader->code != TSDB_CODE_SUCCESS) { - tsdbWarn("tsdb reader is stopped ASAP, code:%s, %s", strerror(pReader->code), pReader->idStr); + tsdbWarn("tsdb reader is stopped ASAP, code:%s, %s", tstrerror(pReader->code), pReader->idStr); return pReader->code; } diff --git a/source/dnode/vnode/src/vnd/vnodeQuery.c b/source/dnode/vnode/src/vnd/vnodeQuery.c index 4fc7a88494..d1c811858a 100644 --- a/source/dnode/vnode/src/vnd/vnodeQuery.c +++ b/source/dnode/vnode/src/vnd/vnodeQuery.c @@ -13,8 +13,8 @@ * along with this program. If not, see . */ -#include "vnd.h" #include "tsdb.h" +#include "vnd.h" #define VNODE_GET_LOAD_RESET_VALS(pVar, oVal, vType, tags) \ do { \ @@ -49,7 +49,7 @@ int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg, bool direct) { // decode req if (tDeserializeSTableInfoReq(pMsg->pCont, pMsg->contLen, &infoReq) != 0) { code = TSDB_CODE_INVALID_MSG; - goto _exit; + goto _exit4; } metaRsp.dbId = pVnode->config.dbId; @@ -59,7 +59,7 @@ int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg, bool direct) { sprintf(tableFName, "%s.%s", infoReq.dbFName, infoReq.tbName); code = vnodeValidateTableHash(pVnode, tableFName); if (code) { - goto _exit; + goto _exit4; } // query meta @@ -67,7 +67,7 @@ int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg, bool direct) { if (metaGetTableEntryByName(&mer1, infoReq.tbName) < 0) { code = terrno; - goto _exit; + goto _exit3; } metaRsp.tableType = mer1.me.type; @@ -81,7 +81,7 @@ int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg, bool direct) { metaRsp.suid = mer1.me.uid; } else if (mer1.me.type == TSDB_CHILD_TABLE) { metaReaderDoInit(&mer2, pVnode->pMeta, META_READER_NOLOCK); - if (metaReaderGetTableEntryByUid(&mer2, mer1.me.ctbEntry.suid) < 0) goto _exit; + if (metaReaderGetTableEntryByUid(&mer2, mer1.me.ctbEntry.suid) < 0) goto _exit2; strcpy(metaRsp.stbName, mer2.me.name); metaRsp.suid = mer2.me.uid; @@ -125,6 +125,12 @@ int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg, bool direct) { tSerializeSTableMetaRsp(pRsp, rspLen, &metaRsp); _exit: + taosMemoryFree(metaRsp.pSchemas); +_exit2: + metaReaderClear(&mer2); +_exit3: + metaReaderClear(&mer1); +_exit4: rpcMsg.info = pMsg->info; rpcMsg.pCont = pRsp; rpcMsg.contLen = rspLen; @@ -141,9 +147,6 @@ _exit: *pMsg = rpcMsg; } - taosMemoryFree(metaRsp.pSchemas); - metaReaderClear(&mer2); - metaReaderClear(&mer1); return TSDB_CODE_SUCCESS; } @@ -706,5 +709,5 @@ void *vnodeGetIvtIdx(void *pVnode) { } int32_t vnodeGetTableSchema(void *pVnode, int64_t uid, STSchema **pSchema, int64_t *suid) { - return tsdbGetTableSchema(((SVnode*)pVnode)->pMeta, uid, pSchema, suid); + return tsdbGetTableSchema(((SVnode *)pVnode)->pMeta, uid, pSchema, suid); } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 693f325556..8cc2f72adb 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -3424,12 +3424,6 @@ _error: // table merge scan operator // table merge scan operator -// TODO: limit / duration optimization -// TODO: get block from tsdReader function, with task killed, func_data all filter out, skip, finish -// TODO: error processing, memory freeing -// TODO: add log for error and perf -// TODO: tsdb reader open/close dynamically -// TODO: blockdata deep cleanup static int32_t subTblRowCompareFn(const void* pLeft, const void* pRight, void* param) { int32_t left = *(int32_t*)pLeft; diff --git a/source/libs/executor/src/streamcountwindowoperator.c b/source/libs/executor/src/streamcountwindowoperator.c index 080f9d4e2b..294c2730df 100644 --- a/source/libs/executor/src/streamcountwindowoperator.c +++ b/source/libs/executor/src/streamcountwindowoperator.c @@ -99,6 +99,9 @@ void setCountOutputBuf(SStreamAggSupporter* pAggSup, TSKEY ts, uint64_t groupId, (void**)&pCurWin->winInfo.pStatePos, &size); } } + if (ts < pCurWin->winInfo.sessionWin.win.ekey) { + pBuffInfo->rebuildWindow = true; + } } else { code = pAggSup->stateStore.streamStateCountWinAddIfNotExist( pAggSup->pState, &pCurWin->winInfo.sessionWin, pAggSup->windowCount, (void**)&pCurWin->winInfo.pStatePos, &size); @@ -115,8 +118,16 @@ void setCountOutputBuf(SStreamAggSupporter* pAggSup, TSKEY ts, uint64_t groupId, } } -static int32_t updateCountWindowInfo(SStreamAggSupporter* pAggSup, SCountWindowInfo* pWinInfo, TSKEY* pTs, int32_t start, int32_t rows, int32_t maxRows, - SSHashObj* pStDeleted, bool* pRebuild) { +static void removeCountResult(SSHashObj* pHashMap, SSHashObj* pResMap, SSessionKey* pKey) { + SSessionKey key = {0}; + getSessionHashKey(pKey, &key); + tSimpleHashRemove(pHashMap, &key, sizeof(SSessionKey)); + tSimpleHashRemove(pResMap, &key, sizeof(SSessionKey)); +} + +static int32_t updateCountWindowInfo(SStreamAggSupporter* pAggSup, SCountWindowInfo* pWinInfo, TSKEY* pTs, + int32_t start, int32_t rows, int32_t maxRows, SSHashObj* pStUpdated, + SSHashObj* pStDeleted, bool* pRebuild) { SSessionKey sWinKey = pWinInfo->winInfo.sessionWin; int32_t num = 0; for (int32_t i = start; i < rows; i++) { @@ -148,6 +159,7 @@ static int32_t updateCountWindowInfo(SStreamAggSupporter* pAggSup, SCountWindowI if (needDelState) { memcpy(pWinInfo->winInfo.pStatePos->pKey, &pWinInfo->winInfo.sessionWin, sizeof(SSessionKey)); + removeCountResult(pStUpdated, pAggSup->pResultRows, &sWinKey); if (pWinInfo->winInfo.pStatePos->needFree) { pAggSup->stateStore.streamStateSessionDel(pAggSup->pState, &sWinKey); } @@ -242,7 +254,8 @@ static void doStreamCountAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl setSessionWinOutputInfo(pStUpdated, &curWin.winInfo); slidingRows = *curWin.pWindowCount; if (!buffInfo.rebuildWindow) { - winRows = updateCountWindowInfo(pAggSup, &curWin, startTsCols, i, rows, pAggSup->windowCount, pStDeleted, &buffInfo.rebuildWindow); + winRows = updateCountWindowInfo(pAggSup, &curWin, startTsCols, i, rows, pAggSup->windowCount, pStUpdated, + pStDeleted, &buffInfo.rebuildWindow); } if (buffInfo.rebuildWindow) { SSessionKey range = {0}; diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index fde5f22aaa..9ff903cdb9 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -1125,8 +1125,11 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) { int64_t firstRowTs = *(int64_t*)tsCol->pData; if ((pOrder->order == TSDB_ORDER_ASC && firstRowTs > pHandle->currMergeLimitTs) || (pOrder->order == TSDB_ORDER_DESC && firstRowTs < pHandle->currMergeLimitTs)) { - continue; - } + if (bExtractedBlock) { + blockDataDestroy(pBlk); + } + continue; + } } if (pBlk != NULL) { @@ -1149,10 +1152,11 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) { tSimpleHashClear(mUidBlk); code = sortBlocksToExtSource(pHandle, aBlkSort, pOrder, aExtSrc); if (code != TSDB_CODE_SUCCESS) { - tSimpleHashCleanup(mUidBlk); - taosArrayDestroy(aBlkSort); - taosArrayDestroy(aExtSrc); - return code; + for (int i = 0; i < taosArrayGetSize(aBlkSort); ++i) { + blockDataDestroy(taosArrayGetP(aBlkSort, i)); + } + taosArrayClear(aBlkSort); + break; } int64_t el = taosGetTimestampUs() - p; @@ -1165,6 +1169,7 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) { szSort = 0; qDebug("source %zu created", taosArrayGetSize(aExtSrc)); } + if (pBlk == NULL) { break; } @@ -1180,6 +1185,9 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) { } tSimpleHashCleanup(mUidBlk); + for (int i = 0; i < taosArrayGetSize(aBlkSort); ++i) { + blockDataDestroy(taosArrayGetP(aBlkSort, i)); + } taosArrayDestroy(aBlkSort); tsortClearOrderdSource(pHandle->pOrderedSource, NULL, NULL); if (!tsortIsClosed(pHandle)) { @@ -1188,7 +1196,7 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) { taosArrayDestroy(aExtSrc); tSimpleHashCleanup(mTableNumRows); pHandle->type = SORT_SINGLESOURCE_SORT; - return TSDB_CODE_SUCCESS; + return code; } static int32_t createBlocksQuickSortInitialSources(SSortHandle* pHandle) { diff --git a/source/libs/index/test/CMakeLists.txt b/source/libs/index/test/CMakeLists.txt index 1ee835573b..eff74f93d2 100644 --- a/source/libs/index/test/CMakeLists.txt +++ b/source/libs/index/test/CMakeLists.txt @@ -175,4 +175,8 @@ IF(NOT TD_DARWIN) NAME idxFstUT COMMAND idxFstUT ) + add_test( + NAME idxFstTest + COMMAND idxFstTest + ) ENDIF () diff --git a/source/libs/index/test/fstTest.cc b/source/libs/index/test/fstTest.cc index b889a2209a..adf783d2eb 100644 --- a/source/libs/index/test/fstTest.cc +++ b/source/libs/index/test/fstTest.cc @@ -1,4 +1,5 @@ +#include #include #include #include @@ -14,6 +15,12 @@ #include "tutil.h" void* callback(void* s) { return s; } +class FstEnv : public ::testing::Test { + protected: + virtual void SetUp() {} + virtual void TearDown() {} +}; + static std::string fileName = TD_TMP_DIR_PATH "tindex.tindex"; class FstWriter { public: @@ -154,7 +161,7 @@ class FstReadMemory { int32_t _size; }; -#define L 100 +#define L 200 #define M 100 #define N 100 @@ -200,7 +207,7 @@ void checkMillonWriteAndReadOfFst() { FstWriter* fw = new FstWriter; Performance_fstWriteRecords(fw); delete fw; - FstReadMemory* fr = new FstReadMemory(1024 * 64 * 1024); + FstReadMemory* fr = new FstReadMemory(1024 * 8 * 1024); if (fr->init()) { printf("success to init fst read"); @@ -637,23 +644,31 @@ void iterTFileReader(char* path, char* uid, char* colName, char* ver) { tfileIteratorDestroy(iter); } -int main(int argc, char* argv[]) { - // tool to check all kind of fst test - // if (argc > 1) { validateTFile(argv[1]); } - // if (argc > 4) { - // path suid colName ver - // iterTFileReader(argv[1], argv[2], argv[3], argv[4]); - //} - checkFstCheckIterator1(); - // checkFstCheckIterator2(); - // checkFstCheckIteratorPrefix(); - // checkFstCheckIteratorRange1(); - // checkFstCheckIteratorRange2(); - // checkFstCheckIteratorRange3(); - // checkFstLongTerm(); - // checkFstPrefixSearch(); +// int main(int argc, char* argv[]) { +// // tool to check all kind of fst test +// // if (argc > 1) { validateTFile(argv[1]); } +// // if (argc > 4) { +// // path suid colName ver +// // iterTFileReader(argv[1], argv[2], argv[3], argv[4]); +// //} +// checkFstCheckIterator1(); +// // checkFstCheckIterator2(); +// // checkFstCheckIteratorPrefix(); +// // checkFstCheckIteratorRange1(); +// // checkFstCheckIteratorRange2(); +// // checkFstCheckIteratorRange3(); +// // checkFstLongTerm(); +// // checkFstPrefixSearch(); - // checkMillonWriteAndReadOfFst(); +// // checkMillonWriteAndReadOfFst(); - return 1; -} +// return 1; +// } +TEST_F(FstEnv, checkIterator1) { checkFstCheckIterator1(); } +TEST_F(FstEnv, checkItertor2) { checkFstCheckIterator2(); } +TEST_F(FstEnv, checkPrefix) { checkFstCheckIteratorPrefix(); } +TEST_F(FstEnv, checkRange1) { checkFstCheckIteratorRange1(); } +TEST_F(FstEnv, checkRange2) { checkFstCheckIteratorRange2(); } +TEST_F(FstEnv, checkRange3) { checkFstCheckIteratorRange3(); } +TEST_F(FstEnv, checkLongTerm) { checkFstLongTerm(); } +TEST_F(FstEnv, checkMillonWriteData) { checkMillonWriteAndReadOfFst(); } diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index b6a447d1dc..6f22a7c232 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -698,7 +698,7 @@ static const char* jkScanLogicPlanTagCond = "TagCond"; static const char* jkScanLogicPlanGroupTags = "GroupTags"; static const char* jkScanLogicPlanOnlyMetaCtbIdx = "OnlyMetaCtbIdx"; static const char* jkScanLogicPlanFilesetDelimited = "FilesetDelimited"; -static const char* jkScanLogicPlanparaTablesSort = "paraTablesSort"; +static const char* jkScanLogicPlanParaTablesSort = "ParaTablesSort"; static int32_t logicScanNodeToJson(const void* pObj, SJson* pJson) { const SScanLogicNode* pNode = (const SScanLogicNode*)pObj; @@ -747,7 +747,7 @@ static int32_t logicScanNodeToJson(const void* pObj, SJson* pJson) { code = tjsonAddBoolToObject(pJson, jkScanLogicPlanFilesetDelimited, pNode->filesetDelimited); } if (TSDB_CODE_SUCCESS == code) { - code = tjsonAddBoolToObject(pJson, jkScanLogicPlanFilesetDelimited, pNode->paraTablesSort); + code = tjsonAddBoolToObject(pJson, jkScanLogicPlanParaTablesSort, pNode->paraTablesSort); } return code; } @@ -800,7 +800,7 @@ static int32_t jsonToLogicScanNode(const SJson* pJson, void* pObj) { code = tjsonGetBoolValue(pJson, jkScanLogicPlanFilesetDelimited, &pNode->filesetDelimited); } if (TSDB_CODE_SUCCESS == code) { - code = tjsonGetBoolValue(pJson, jkScanLogicPlanFilesetDelimited, &pNode->paraTablesSort); + code = tjsonGetBoolValue(pJson, jkScanLogicPlanParaTablesSort, &pNode->paraTablesSort); } return code; } @@ -1895,7 +1895,7 @@ static const char* jkTableScanPhysiPlanAssignBlockUid = "AssignBlockUid"; static const char* jkTableScanPhysiPlanIgnoreUpdate = "IgnoreUpdate"; static const char* jkTableScanPhysiPlanFilesetDelimited = "FilesetDelimited"; static const char* jkTableScanPhysiPlanNeedCountEmptyTable = "NeedCountEmptyTable"; -static const char* jkTableScanPhysiPlanparaTablesSort = "paraTablesSort"; +static const char* jkTableScanPhysiPlanParaTablesSort = "ParaTablesSort"; static int32_t physiTableScanNodeToJson(const void* pObj, SJson* pJson) { const STableScanPhysiNode* pNode = (const STableScanPhysiNode*)pObj; @@ -1971,7 +1971,7 @@ static int32_t physiTableScanNodeToJson(const void* pObj, SJson* pJson) { code = tjsonAddBoolToObject(pJson, jkTableScanPhysiPlanNeedCountEmptyTable, pNode->needCountEmptyTable); } if (TSDB_CODE_SUCCESS == code) { - code = tjsonAddBoolToObject(pJson, jkTableScanPhysiPlanparaTablesSort, pNode->paraTablesSort); + code = tjsonAddBoolToObject(pJson, jkTableScanPhysiPlanParaTablesSort, pNode->paraTablesSort); } return code; } @@ -2050,7 +2050,7 @@ static int32_t jsonToPhysiTableScanNode(const SJson* pJson, void* pObj) { code = tjsonGetBoolValue(pJson, jkTableScanPhysiPlanNeedCountEmptyTable, &pNode->needCountEmptyTable); } if (TSDB_CODE_SUCCESS == code) { - code = tjsonGetBoolValue(pJson, jkTableScanPhysiPlanparaTablesSort, &pNode->paraTablesSort); + code = tjsonGetBoolValue(pJson, jkTableScanPhysiPlanParaTablesSort, &pNode->paraTablesSort); } return code; } diff --git a/source/libs/parser/src/parInsertUtil.c b/source/libs/parser/src/parInsertUtil.c index 0113cde035..3f87f79301 100644 --- a/source/libs/parser/src/parInsertUtil.c +++ b/source/libs/parser/src/parInsertUtil.c @@ -285,7 +285,6 @@ static int32_t rebuildTableData(SSubmitTbData* pSrc, SSubmitTbData** pDst) { pTmp->suid = pSrc->suid; pTmp->uid = pSrc->uid; pTmp->sver = pSrc->sver; - pTmp->source = pSrc->source; pTmp->pCreateTbReq = NULL; if (pTmp->flags & SUBMIT_REQ_AUTO_CREATE_TABLE) { if (pSrc->pCreateTbReq) { @@ -654,7 +653,7 @@ int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreate goto end; } - pTableCxt->pData->source = SOURCE_TAOSX; + pTableCxt->pData->flags |= TD_REQ_FROM_TAOX; if(tmp == NULL){ ret = initTableColSubmitData(pTableCxt); if (ret != TSDB_CODE_SUCCESS) { @@ -722,7 +721,7 @@ int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreate goto end; } fields += sizeof(int8_t) + sizeof(int32_t); - if (needChangeLength && version == 1) { + if (needChangeLength && version == BLOCK_VERSION_1) { pStart += htonl(colLength[j]); } else { pStart += colLength[j]; @@ -753,7 +752,7 @@ int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreate goto end; } fields += sizeof(int8_t) + sizeof(int32_t); - if (needChangeLength && version == 1) { + if (needChangeLength && version == BLOCK_VERSION_1) { pStart += htonl(colLength[i]); } else { pStart += colLength[i]; diff --git a/source/libs/stream/src/streamSessionState.c b/source/libs/stream/src/streamSessionState.c index bd28d2bca9..3d0241df75 100644 --- a/source/libs/stream/src/streamSessionState.c +++ b/source/libs/stream/src/streamSessionState.c @@ -488,6 +488,7 @@ SStreamStateCur* countWinStateSeekKeyPrev(SStreamFileState* pFileState, const SS void* pFileStore = getStateFileStore(pFileState); SStreamStateCur* pCur = streamStateSessionSeekKeyPrev_rocksdb(pFileStore, pWinKey); if (pCur) { + pCur->pStreamFileState = pFileState; SSessionKey key = {0}; void* pVal = NULL; int len = 0; @@ -736,6 +737,7 @@ int32_t getCountWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, C void* pRockVal = NULL; SStreamStateCur* pCur = streamStateSessionSeekToLast_rocksdb(pFileStore, pKey->groupId); code = streamStateSessionGetKVByCur_rocksdb(pCur, pWinKey, &pRockVal, pVLen); + streamStateFreeCur(pCur); if (code == TSDB_CODE_SUCCESS || isFlushedState(pFileState, endTs, 0)) { qDebug("===stream===0 get state win:%" PRId64 ",%" PRId64 " from disc, res %d", pWinKey->win.skey, pWinKey->win.ekey, code); if (code == TSDB_CODE_SUCCESS) { @@ -743,7 +745,6 @@ int32_t getCountWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, C COUNT_TYPE* pWinStateCout = (COUNT_TYPE*)( (char*)(pRockVal) + (valSize - sizeof(COUNT_TYPE)) ); if (inSessionWindow(pWinKey, startTs, gap) || (*pWinStateCout) < winCount) { (*pVal) = createSessionWinBuff(pFileState, pWinKey, pRockVal, pVLen); - streamStateFreeCur(pCur); goto _end; } } @@ -751,7 +752,6 @@ int32_t getCountWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, C pWinKey->win.ekey = endTs; (*pVal) = createSessionWinBuff(pFileState, pWinKey, NULL, NULL); taosMemoryFree(pRockVal); - streamStateFreeCur(pCur); } else { (*pVal) = addNewSessionWindow(pFileState, pWinStates, pWinKey); code = TSDB_CODE_FAILED; diff --git a/source/os/src/osFile.c b/source/os/src/osFile.c index bab9ba0cea..e6491639dc 100644 --- a/source/os/src/osFile.c +++ b/source/os/src/osFile.c @@ -1329,7 +1329,6 @@ int32_t taosCompressFile(char *srcFileName, char *destFileName) { char *data = taosMemoryMalloc(compressSize); gzFile dstFp = NULL; - TdFilePtr pFile = NULL; TdFilePtr pSrcFile = NULL; pSrcFile = taosOpenFile(srcFileName, TD_FILE_READ | TD_FILE_STREAM); @@ -1369,8 +1368,8 @@ int32_t taosCompressFile(char *srcFileName, char *destFileName) { } cmp_end: - if (pFile) { - taosCloseFile(&pFile); + if (fd >= 0) { + close(fd); } if (pSrcFile) { taosCloseFile(&pSrcFile); diff --git a/source/util/src/tbase58.c b/source/util/src/tbase58.c index fa3ecadd14..5eb72879c3 100644 --- a/source/util/src/tbase58.c +++ b/source/util/src/tbase58.c @@ -18,24 +18,29 @@ #include #include -#define BASE_BUF_SIZE 256 +#define TBASE_BUF_SIZE 256 static const char *basis_58 = "123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz"; char *base58_encode(const uint8_t *value, int32_t vlen) { const uint8_t *pb = value; const uint8_t *pe = pb + vlen; - uint8_t buf[BASE_BUF_SIZE] = {0}; + uint8_t buf[TBASE_BUF_SIZE] = {0}; uint8_t *pbuf = &buf[0]; bool bfree = false; int32_t nz = 0, size = 0, len = 0; + if (vlen > TBASE_MAX_ILEN) { + terrno = TSDB_CODE_INVALID_PARA; + return NULL; + } + while (pb != pe && *pb == 0) { ++pb; ++nz; } size = (pe - pb) * 69 / 50 + 1; - if (size > BASE_BUF_SIZE) { + if (size > TBASE_BUF_SIZE) { if (!(pbuf = taosMemoryCalloc(1, size))) { terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; @@ -47,7 +52,7 @@ char *base58_encode(const uint8_t *value, int32_t vlen) { int32_t num = *pb; int32_t i = 0; for (int32_t j = (int32_t)size - 1; (num != 0 || i < len) && j >= 0; --j, ++i) { - num += ((int32_t)buf[j]) << 8; + num += ((int32_t)pbuf[j]) << 8; pbuf[j] = num % 58; num /= 58; } @@ -57,7 +62,7 @@ char *base58_encode(const uint8_t *value, int32_t vlen) { const uint8_t *pi = pbuf + (size - len); while (pi != pbuf + size && *pi == 0) ++pi; - uint8_t *result = taosMemoryCalloc(1, size + 1); + uint8_t *result = taosMemoryCalloc(1, nz + (pbuf + size - pi) + 1); if (!result) { terrno = TSDB_CODE_OUT_OF_MEMORY; if (bfree) taosMemoryFree(pbuf); @@ -82,20 +87,35 @@ static const signed char index_58[256] = { -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1}; uint8_t *base58_decode(const char *value, size_t inlen, int32_t *outlen) { + const char *pb = value; const char *pe = value + inlen; - uint8_t buf[BASE_BUF_SIZE] = {0}; + uint8_t buf[TBASE_BUF_SIZE] = {0}; uint8_t *pbuf = &buf[0]; bool bfree = false; int32_t nz = 0, size = 0, len = 0; - while (*value && isspace(*value)) ++value; - while (*value == '1') { - ++nz; - ++value; + if (inlen > TBASE_MAX_OLEN) { + terrno = TSDB_CODE_INVALID_PARA; + return NULL; } - size = (int32_t)(pe - value) * 733 / 1000 + 1; - if (size > BASE_BUF_SIZE) { + while (pb != pe) { + if (*pb == 0) { + terrno = TSDB_CODE_INVALID_PARA; + return NULL; + } + ++pb; + } + + pb = value; + while (pb != pe && *pb && isspace(*pb)) ++pb; + while (pb != pe && *pb == '1') { + ++nz; + ++pb; + } + + size = (int32_t)(pe - pb) * 733 / 1000 + 1; + if (size > TBASE_BUF_SIZE) { if (!(pbuf = taosMemoryCalloc(1, size))) { terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; @@ -103,9 +123,10 @@ uint8_t *base58_decode(const char *value, size_t inlen, int32_t *outlen) { bfree = true; } - while (*value && !isspace(*value)) { - int32_t num = index_58[(uint8_t)*value]; + while (pb != pe && *pb && !isspace(*pb)) { + int32_t num = index_58[(uint8_t)*pb]; if (num == -1) { + terrno = TSDB_CODE_INVALID_PARA; if (bfree) taosMemoryFree(pbuf); return NULL; } @@ -116,18 +137,18 @@ uint8_t *base58_decode(const char *value, size_t inlen, int32_t *outlen) { num >>= 8; } len = i; - ++value; + ++pb; } - while (isspace(*value)) ++value; - if (*value != 0) { + while (pb != pe && isspace(*pb)) ++pb; + if (*pb != 0) { if (bfree) taosMemoryFree(pbuf); return NULL; } const uint8_t *it = pbuf + (size - len); while (it != pbuf + size && *it == 0) ++it; - uint8_t *result = taosMemoryCalloc(1, size + 1); + uint8_t *result = taosMemoryCalloc(1, nz + (pbuf + size - it) + 1); if (!result) { if (bfree) taosMemoryFree(pbuf); terrno = TSDB_CODE_OUT_OF_MEMORY; diff --git a/source/util/src/tskiplist.c b/source/util/src/tskiplist.c index eadd9a2413..f90509bec1 100644 --- a/source/util/src/tskiplist.c +++ b/source/util/src/tskiplist.c @@ -21,7 +21,7 @@ #include "tutil.h" static int32_t initForwardBackwardPtr(SSkipList *pSkipList); -static SSkipListNode *getPriorNode(SSkipList *pSkipList, const char *val, int32_t order, SSkipListNode **pCur); +static SSkipListNode * getPriorNode(SSkipList *pSkipList, const char *val, int32_t order, SSkipListNode **pCur); static void tSkipListRemoveNodeImpl(SSkipList *pSkipList, SSkipListNode *pNode); static void tSkipListCorrectLevel(SSkipList *pSkipList); static SSkipListIterator *doCreateSkipListIterator(SSkipList *pSkipList, int32_t order); @@ -131,12 +131,14 @@ SSkipListNode *tSkipListPut(SSkipList *pSkipList, void *pData) { return pNode; } +#ifdef BUILD_NO_CALL + void tSkipListPutBatchByIter(SSkipList *pSkipList, void *iter, iter_next_fn_t iterate) { SSkipListNode *backward[MAX_SKIP_LIST_LEVEL] = {0}; SSkipListNode *forward[MAX_SKIP_LIST_LEVEL] = {0}; bool hasDup = false; - char *pKey = NULL; - char *pDataKey = NULL; + char * pKey = NULL; + char * pDataKey = NULL; int32_t compare = 0; tSkipListWLock(pSkipList); @@ -260,6 +262,7 @@ void tSkipListRemoveNode(SSkipList *pSkipList, SSkipListNode *pNode) { tSkipListCorrectLevel(pSkipList); tSkipListUnlock(pSkipList); } +#endif SSkipListIterator *tSkipListCreateIter(SSkipList *pSkipList) { if (pSkipList == NULL) return NULL; @@ -350,6 +353,7 @@ void *tSkipListDestroyIter(SSkipListIterator *iter) { return NULL; } +#ifdef BUILD_NO_CALL void tSkipListPrint(SSkipList *pSkipList, int16_t nlevel) { if (pSkipList == NULL || pSkipList->level < nlevel || nlevel <= 0) { return; @@ -358,7 +362,7 @@ void tSkipListPrint(SSkipList *pSkipList, int16_t nlevel) { SSkipListNode *p = SL_NODE_GET_FORWARD_POINTER(pSkipList->pHead, nlevel - 1); int32_t id = 1; - char *prev = NULL; + char * prev = NULL; while (p != pSkipList->pTail) { char *key = SL_GET_NODE_KEY(pSkipList, p); @@ -392,6 +396,7 @@ void tSkipListPrint(SSkipList *pSkipList, int16_t nlevel) { p = SL_NODE_GET_FORWARD_POINTER(p, nlevel - 1); } } +#endif static void tSkipListDoInsert(SSkipList *pSkipList, SSkipListNode **direction, SSkipListNode *pNode, bool isForward) { for (int32_t i = 0; i < pNode->level; ++i) { @@ -460,7 +465,7 @@ static FORCE_INLINE int32_t tSkipListUnlock(SSkipList *pSkipList) { static bool tSkipListGetPosToPut(SSkipList *pSkipList, SSkipListNode **backward, void *pData) { int32_t compare = 0; bool hasDupKey = false; - char *pDataKey = pSkipList->keyFn(pData); + char * pDataKey = pSkipList->keyFn(pData); if (pSkipList->size == 0) { for (int32_t i = 0; i < pSkipList->maxLevel; i++) { @@ -516,6 +521,7 @@ static bool tSkipListGetPosToPut(SSkipList *pSkipList, SSkipListNode **backward, return hasDupKey; } +#ifdef BUILD_NO_CALL static void tSkipListRemoveNodeImpl(SSkipList *pSkipList, SSkipListNode *pNode) { int32_t level = pNode->level; uint8_t dupMode = SL_DUP_MODE(pSkipList); @@ -540,6 +546,7 @@ static void tSkipListCorrectLevel(SSkipList *pSkipList) { pSkipList->level -= 1; } } +#endif UNUSED_FUNC static FORCE_INLINE void recordNodeEachLevel(SSkipList *pSkipList, int32_t level) { // record link count in each level diff --git a/source/util/test/CMakeLists.txt b/source/util/test/CMakeLists.txt index 65eac9c090..6f32453703 100644 --- a/source/util/test/CMakeLists.txt +++ b/source/util/test/CMakeLists.txt @@ -101,6 +101,22 @@ add_test( COMMAND talgoTest ) +# tbaseCodecTest +add_executable(tbaseCodecTest "tbaseCodecTest.cpp") +target_link_libraries(tbaseCodecTest os util common gtest_main) +add_test( + NAME tbaseCodecTest + COMMAND tbaseCodecTest +) + +# tbaseCodecTest +add_executable(tbaseCodecTest "tbaseCodecTest.cpp") +target_link_libraries(tbaseCodecTest os util common gtest_main) +add_test( + NAME tbaseCodecTest + COMMAND tbaseCodecTest +) + # bufferTest add_executable(bufferTest "bufferTest.cpp") target_link_libraries(bufferTest os util gtest_main) diff --git a/source/util/test/tbaseCodecTest.cpp b/source/util/test/tbaseCodecTest.cpp new file mode 100644 index 0000000000..4c56979885 --- /dev/null +++ b/source/util/test/tbaseCodecTest.cpp @@ -0,0 +1,83 @@ +#include +#include + +#include +#include "os.h" +#include "osTime.h" +#include "taos.h" +#include "taoserror.h" +#include "tbase58.h" +#include "tglobal.h" + +using namespace std; + +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wwrite-strings" +#pragma GCC diagnostic ignored "-Wunused-function" +#pragma GCC diagnostic ignored "-Wunused-variable" +#pragma GCC diagnostic ignored "-Wsign-compare" + +int main(int argc, char **argv) { + testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} + +static void checkBase58Codec(uint8_t *pRaw, int32_t rawLen, int32_t index) { + int64_t start = taosGetTimestampUs(); + char *pEnc = base58_encode((const uint8_t *)pRaw, rawLen); + ASSERT_NE(nullptr, pEnc); + + int32_t encLen = strlen(pEnc); + int64_t endOfEnc = taosGetTimestampUs(); + std::cout << "index:" << index << ", encLen is " << encLen << ", cost:" << endOfEnc - start << " us" << std::endl; + int32_t decLen = 0; + char *pDec = (char *)base58_decode((const char *)pEnc, encLen, &decLen); + std::cout << "index:" << index << ", decLen is " << decLen << ", cost:" << taosGetTimestampUs() - endOfEnc << " us" + << std::endl; + ASSERT_NE(nullptr, pDec); + ASSERT_EQ(rawLen, decLen); + ASSERT_LE(rawLen, encLen); + ASSERT_EQ(0, strncmp((char *)pRaw, pDec, rawLen)); + taosMemoryFreeClear(pDec); + taosMemoryFreeClear(pEnc); +} + +TEST(TD_BASE_CODEC_TEST, tbase58_test) { + const int32_t TEST_LEN_MAX = TBASE_MAX_ILEN; + const int32_t TEST_LEN_STEP = 10; + int32_t rawLen = 0; + uint8_t *pRaw = NULL; + + pRaw = (uint8_t *)taosMemoryCalloc(1, TEST_LEN_MAX); + ASSERT_NE(nullptr, pRaw); + + // 1. normal case + // string blend with char and '\0' + rawLen = TEST_LEN_MAX; + for (int32_t i = 0; i < TEST_LEN_MAX; i += 500) { + checkBase58Codec(pRaw, rawLen, i); + pRaw[i] = i & 127; + } + + // string without '\0' + for (int32_t i = 0; i < TEST_LEN_MAX; ++i) { + pRaw[i] = i & 127; + } + checkBase58Codec(pRaw, TEST_LEN_MAX, 0); + for (int32_t i = 0; i < TEST_LEN_MAX; i += 500) { + rawLen = i; + checkBase58Codec(pRaw, rawLen, i); + } + taosMemoryFreeClear(pRaw); + ASSERT_EQ(nullptr, pRaw); + + // 2. overflow case + char tmp[1]; + char *pEnc = base58_encode((const uint8_t *)tmp, TBASE_MAX_ILEN + 1); + ASSERT_EQ(nullptr, pEnc); + char *pDec = (char *)base58_decode((const char *)tmp, TBASE_MAX_OLEN + 1, NULL); + ASSERT_EQ(nullptr, pDec); + + taosMemoryFreeClear(pRaw); + ASSERT_EQ(nullptr, pRaw); +} \ No newline at end of file diff --git a/tests/army/community/cluster/snapshot.py b/tests/army/community/cluster/snapshot.py index b21cbb1ad8..eef650cc77 100644 --- a/tests/army/community/cluster/snapshot.py +++ b/tests/army/community/cluster/snapshot.py @@ -25,6 +25,7 @@ from frame.cases import * from frame.sql import * from frame.caseBase import * from frame import * +from frame.srvCtl import * class TDTestCase(TBase): @@ -65,6 +66,21 @@ class TDTestCase(TBase): sql = f"select avg(dc) from {self.db}.{self.stb}" tdSql.checkFirstValue(sql, 200) + def alterReplica3(self): + sql = f"alter database {self.db} replica 3" + tdSql.execute(sql, show=True) + time.sleep(2) + sc.dnodeStop(2) + sc.dnodeStop(3) + time.sleep(5) + sc.dnodeStart(2) + sc.dnodeStart(3) + + if self.waitTransactionZero() is False: + tdLog.exit(f"{sql} transaction not finished") + return False + return True + def doAction(self): tdLog.info(f"do action.") self.flushDb() @@ -81,7 +97,7 @@ class TDTestCase(TBase): self.alterReplica(1) self.checkAggCorrect() self.compactDb() - self.alterReplica(3) + self.alterReplica3() vgids = self.getVGroup(self.db) selid = random.choice(vgids) diff --git a/tests/army/enterprise/s3/s3_basic.py b/tests/army/enterprise/s3/s3_basic.py index e7bc188ca5..45519d925f 100644 --- a/tests/army/enterprise/s3/s3_basic.py +++ b/tests/army/enterprise/s3/s3_basic.py @@ -28,12 +28,12 @@ from frame import * from frame.eos import * # -# 192.168.1.52 MINIO S3 API KEY: MQCEIoaPGUs1mhXgpUAu:XTgpN2dEMInnYgqN4gj3G5zgb39ROtsisKKy0GFa +# 192.168.1.52 MINIO S3 # ''' s3EndPoint http://192.168.1.52:9000 -s3AccessKey MQCEIoaPGUs1mhXgpUAu:XTgpN2dEMInnYgqN4gj3G5zgb39ROtsisKKy0GFa +s3AccessKey 'zOgllR6bSnw2Ah3mCNel:cdO7oXAu3Cqdb1rUdevFgJMi0LtRwCXdWKQx4bhX' s3BucketName ci-bucket s3UploadDelaySec 60 ''' @@ -42,7 +42,7 @@ s3UploadDelaySec 60 class TDTestCase(TBase): updatecfgDict = { 's3EndPoint': 'http://192.168.1.52:9000', - 's3AccessKey': 'MQCEIoaPGUs1mhXgpUAu:XTgpN2dEMInnYgqN4gj3G5zgb39ROtsisKKy0GFa', + 's3AccessKey': 'zOgllR6bSnw2Ah3mCNel:cdO7oXAu3Cqdb1rUdevFgJMi0LtRwCXdWKQx4bhX', 's3BucketName': 'ci-bucket', 's3BlockSize': '10240', 's3BlockCacheSize': '320', @@ -78,14 +78,27 @@ class TDTestCase(TBase): self.trimDb(True) rootPath = sc.clusterRootPath() - cmd = f"ls {rootPath}/dnode1/data20/vnode/vnode*/tsdb/*.data" + cmd = f"ls {rootPath}/dnode1/data2*/vnode/vnode*/tsdb/*.data" tdLog.info(cmd) loop = 0 - while len(eos.runRetList(cmd)) > 0 and loop < 40: - time.sleep(5) + rets = [] + while loop < 180: + time.sleep(3) + rets = eos.runRetList(cmd) + cnt = len(rets) + if cnt == 0: + tdLog.info("All data file upload to server over.") + break self.trimDb(True) + tdLog.info(f"loop={loop} no upload {cnt} data files wait 3s retry ...") + if loop == 0: + sc.dnodeStop(1) + time.sleep(2) + sc.dnodeStart(1) loop += 1 - tdLog.info(f"loop={loop} wait 5s...") + + if len(rets) > 0: + tdLog.exit(f"s3 can not upload all data to server. data files cnt={len(rets)} list={rets}") def checkStreamCorrect(self): sql = f"select count(*) from {self.db}.stm1" diff --git a/tests/army/frame/srvCtl.py b/tests/army/frame/srvCtl.py index 091856056b..3a9b0cdf4b 100644 --- a/tests/army/frame/srvCtl.py +++ b/tests/army/frame/srvCtl.py @@ -33,14 +33,14 @@ class srvCtl: # control server # - # start + # start idx base is 1 def dnodeStart(self, idx): if clusterDnodes.getModel() == 'cluster': return clusterDnodes.starttaosd(idx) return tdDnodes.starttaosd(idx) - # stop + # stop idx base is 1 def dnodeStop(self, idx): if clusterDnodes.getModel() == 'cluster': return clusterDnodes.stoptaosd(idx) diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index cc15df33a1..12f1f62f63 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -49,6 +49,7 @@ ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/tbname_vgroup.py ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/count_interval.py ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/compact-col.py +,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/tms_memleak.py ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/stbJoin.py ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/stbJoin.py -Q 2 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/stbJoin.py -Q 3 diff --git a/tests/system-test/2-query/tms_memleak.py b/tests/system-test/2-query/tms_memleak.py new file mode 100644 index 0000000000..0d5cdd8272 --- /dev/null +++ b/tests/system-test/2-query/tms_memleak.py @@ -0,0 +1,51 @@ +import sys +from util.log import * +from util.cases import * +from util.sql import * +from util.dnodes import tdDnodes +from math import inf + +class TDTestCase: + def caseDescription(self): + ''' + case1: [TD-] + ''' + return + + def init(self, conn, logSql, replicaVer=1): + tdLog.debug("start to execute %s" % __file__) + tdSql.init(conn.cursor(), True) + self.conn = conn + + def restartTaosd(self, index=1, dbname="db"): + tdDnodes.stop(index) + tdDnodes.startWithoutSleep(index) + tdSql.execute(f"use tms_memleak") + + def run(self): + print("running {}".format(__file__)) + tdSql.execute("drop database if exists tms_memleak") + tdSql.execute("create database if not exists tms_memleak") + tdSql.execute('use tms_memleak') + + tdSql.execute('create table st(ts timestamp, f int) tags (t int);') + + tdSql.execute("insert into ct1 using st tags(1) values('2021-04-19 00:00:01', 1)('2021-04-19 00:00:02', 2)('2021-04-19 00:00:03', 3)('2021-04-19 00:00:04', 4)") + + tdSql.execute("insert into ct2 using st tags(2) values('2021-04-20 00:00:01', 5)('2021-04-20 00:00:02', 6)('2021-04-20 00:00:03', 7)('2021-04-20 00:00:04', 8)") + + tdSql.execute("insert into ct3 using st tags(3) values('2021-04-21 00:00:01', 5)('2021-04-21 00:00:02', 6)('2021-04-21 00:00:03', 7)('2021-04-21 00:00:04', 8)") + + tdSql.execute("insert into ct4 using st tags(4) values('2021-04-22 00:00:01', 5)('2021-04-22 00:00:02', 6)('2021-04-22 00:00:03', 7)('2021-04-22 00:00:04', 8)") + + tdSql.query("select * from st order by ts limit 1 "); + tdSql.checkRows(1) + tdSql.checkData(0, 1, 1); + + tdSql.execute('drop database tms_memleak') + def stop(self): + tdSql.close() + tdLog.success("%s successfully executed" % __file__) + +tdCases.addWindows(__file__, TDTestCase()) +tdCases.addLinux(__file__, TDTestCase()) diff --git a/tests/system-test/7-tmq/tmq_taosx.py b/tests/system-test/7-tmq/tmq_taosx.py index ac8a6f377c..c547385d70 100644 --- a/tests/system-test/7-tmq/tmq_taosx.py +++ b/tests/system-test/7-tmq/tmq_taosx.py @@ -323,7 +323,7 @@ class TDTestCase: tdSql.query("select * from st") tdSql.checkRows(8) - tdSql.execute(f'create topic topic_excluded with meta as database d1') + tdSql.execute(f'create topic topic_all with meta as database d1') consumer_dict = { "group.id": "g1", "td.connect.user": "root", @@ -333,7 +333,7 @@ class TDTestCase: consumer = Consumer(consumer_dict) try: - consumer.subscribe(["topic_excluded"]) + consumer.subscribe(["topic_all"]) except TmqError: tdLog.exit(f"subscribe error") diff --git a/utils/test/c/tmq_taosx_ci.c b/utils/test/c/tmq_taosx_ci.c index 056b7dc6cf..2257089f06 100644 --- a/utils/test/c/tmq_taosx_ci.c +++ b/utils/test/c/tmq_taosx_ci.c @@ -20,6 +20,7 @@ #include #include "taos.h" #include "types.h" +#include "tmsg.h" static int running = 1; TdFilePtr g_fp = NULL; @@ -966,7 +967,14 @@ void testConsumeExcluded(int topic_type){ tmq_raw_data raw = {0}; tmq_get_raw(msg, &raw); if(topic_type == 1){ - assert(raw.raw_type != 2 && raw.raw_type != 4); + assert(raw.raw_type != 2 && raw.raw_type != 4 && + raw.raw_type != TDMT_VND_CREATE_STB && + raw.raw_type != TDMT_VND_ALTER_STB && + raw.raw_type != TDMT_VND_CREATE_TABLE && + raw.raw_type != TDMT_VND_ALTER_TABLE && + raw.raw_type != TDMT_VND_DELETE); + assert(raw.raw_type == TDMT_VND_DROP_STB || + raw.raw_type == TDMT_VND_DROP_TABLE); }else if(topic_type == 2){ assert(0); }