diff --git a/docs/en/12-taos-sql/20-keywords.md b/docs/en/12-taos-sql/20-keywords.md index d563181b87..983d4f63c9 100644 --- a/docs/en/12-taos-sql/20-keywords.md +++ b/docs/en/12-taos-sql/20-keywords.md @@ -178,7 +178,7 @@ The following list shows all reserved keywords: - MATCH - MAX_DELAY -- MAX_SPEED +- BWLIMIT - MAXROWS - MERGE - META diff --git a/docs/en/14-reference/03-connector/09-csharp.mdx b/docs/en/14-reference/03-connector/09-csharp.mdx index 718462295a..203d44fe02 100644 --- a/docs/en/14-reference/03-connector/09-csharp.mdx +++ b/docs/en/14-reference/03-connector/09-csharp.mdx @@ -30,6 +30,10 @@ The source code of `TDengine.Connector` is hosted on [GitHub](https://github.com The supported platforms are the same as those supported by the TDengine client driver. +:::note +Please note TDengine does not support 32bit Windows any more. +::: + ## Version support Please refer to [version support list](/reference/connector#version-support) diff --git a/docs/en/14-reference/06-taosdump.md b/docs/en/14-reference/06-taosdump.md index 6d5547e7a9..baf07d6b9e 100644 --- a/docs/en/14-reference/06-taosdump.md +++ b/docs/en/14-reference/06-taosdump.md @@ -102,6 +102,8 @@ Usage: taosdump [OPTION...] dbname [tbname ...] -L, --loose-mode Use loose mode if the table name and column name use letter and number only. Default is NOT. -n, --no-escape No escape char '`'. Default is using it. + -Q, --dot-replace Repalce dot character with underline character in + the table name. -T, --thread-num=THREAD_NUM Number of thread for dump in file. Default is 8. -C, --cloud=CLOUD_DSN specify a DSN to access TDengine cloud service diff --git a/docs/zh/08-connector/40-csharp.mdx b/docs/zh/08-connector/40-csharp.mdx index 3a945e77fd..325c71da88 100644 --- a/docs/zh/08-connector/40-csharp.mdx +++ b/docs/zh/08-connector/40-csharp.mdx @@ -29,6 +29,10 @@ import CSAsyncQuery from "../07-develop/04-query-data/_cs_async.mdx" 支持的平台和 TDengine 客户端驱动支持的平台一致。 +:::note +注意 TDengine 不再支持 32 位 Windows 平台。 +::: + ## 版本支持 请参考[版本支持列表](../#版本支持) diff --git a/docs/zh/12-taos-sql/20-keywords.md b/docs/zh/12-taos-sql/20-keywords.md index f52af2f282..e7e926d0b7 100644 --- a/docs/zh/12-taos-sql/20-keywords.md +++ b/docs/zh/12-taos-sql/20-keywords.md @@ -178,7 +178,7 @@ description: TDengine 保留关键字的详细列表 - MATCH - MAX_DELAY -- MAX_SPEED +- BWLIMIT - MAXROWS - MERGE - META diff --git a/docs/zh/14-reference/06-taosdump.md b/docs/zh/14-reference/06-taosdump.md index 12122edd32..9fe3c5af7a 100644 --- a/docs/zh/14-reference/06-taosdump.md +++ b/docs/zh/14-reference/06-taosdump.md @@ -105,6 +105,8 @@ Usage: taosdump [OPTION...] dbname [tbname ...] -L, --loose-mode Using loose mode if the table name and column name use letter and number only. Default is NOT. -n, --no-escape No escape char '`'. Default is using it. + -Q, --dot-replace Repalce dot character with underline character in + the table name. -T, --thread-num=THREAD_NUM Number of thread for dump in file. Default is 8. -C, --cloud=CLOUD_DSN specify a DSN to access TDengine cloud service diff --git a/examples/JDBC/taosdemo/pom.xml b/examples/JDBC/taosdemo/pom.xml index 0d47663bba..ff64d3e1df 100644 --- a/examples/JDBC/taosdemo/pom.xml +++ b/examples/JDBC/taosdemo/pom.xml @@ -133,6 +133,7 @@ 8 8 + UTF-8 diff --git a/examples/JDBC/taosdemo/readme.md b/examples/JDBC/taosdemo/readme.md index edac970399..986eef8a05 100644 --- a/examples/JDBC/taosdemo/readme.md +++ b/examples/JDBC/taosdemo/readme.md @@ -8,4 +8,4 @@ java -jar target/taosdemo-2.0.1-jar-with-dependencies.jar -host -data ``` 如果发生错误 Exception in thread "main" java.lang.UnsatisfiedLinkError: no taos in java.library.path -请检查是否安装 TDengine 客户端安装包或编译 TDengine 安装。如果确定已经安装过还出现这个错误,可以在命令行 java 后加 -Djava.library.path=/usr/local/lib 来指定寻找共享库的路径。 +请检查是否安装 TDengine 客户端安装包或编译 TDengine 安装。如果确定已经安装过还出现这个错误,可以在命令行 java 后加 -Djava.library.path=/usr/lib 来指定寻找共享库的路径。 diff --git a/include/common/ttokendef.h b/include/common/ttokendef.h index 8a6b7b5020..f74ced9190 100644 --- a/include/common/ttokendef.h +++ b/include/common/ttokendef.h @@ -113,7 +113,7 @@ #define TK_TABLE_PREFIX 95 #define TK_TABLE_SUFFIX 96 #define TK_NK_COLON 97 -#define TK_MAX_SPEED 98 +#define TK_BWLIMIT 98 #define TK_START 99 #define TK_TIMESTAMP 100 #define TK_END 101 diff --git a/source/client/jni/com_taosdata_jdbc_tmq_TMQConnector.h b/source/client/jni/com_taosdata_jdbc_tmq_TMQConnector.h index 422bcd57ac..ebc4eacdf9 100644 --- a/source/client/jni/com_taosdata_jdbc_tmq_TMQConnector.h +++ b/source/client/jni/com_taosdata_jdbc_tmq_TMQConnector.h @@ -92,6 +92,10 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqSubscriptionIm */ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqCommitSync(JNIEnv *, jobject, jlong, jlong); +JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqCommitAllSync(JNIEnv *, jobject, jlong); + +JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqCommitOffsetSyncImp(JNIEnv *, jobject, jlong, jstring, + jint, jlong); /* * Class: com_taosdata_jdbc_tmq_TMQConnector * Method: tmqCommitAsync @@ -102,6 +106,12 @@ JNIEXPORT void JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqCommitAsync(JN JNIEXPORT void JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_consumerCommitAsync(JNIEnv *, jobject, jlong, jlong, jobject); +JNIEXPORT void JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_consumerCommitAllAsync(JNIEnv *, jobject, jlong, + jobject); + +JNIEXPORT void JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_consumerCommitOffsetAsync(JNIEnv *, jobject, jlong, + jstring, jint, jlong, jobject); + /* * Class: com_taosdata_jdbc_tmq_TMQConnector * Method: tmqUnsubscribeImp @@ -179,6 +189,11 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqSeekImp(JNIEnv JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqGetTopicAssignmentImp(JNIEnv *, jobject, jlong, jstring, jobject); +JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqCommittedImp(JNIEnv *, jobject, jlong, jstring, + jint); + +JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqPositionImp(JNIEnv *, jobject, jlong, jstring, jint); + #ifdef __cplusplus } #endif diff --git a/source/client/src/clientSml.c b/source/client/src/clientSml.c index ffff3df5d0..8e5c6e7250 100644 --- a/source/client/src/clientSml.c +++ b/source/client/src/clientSml.c @@ -218,7 +218,16 @@ int32_t smlSetCTableName(SSmlTableInfo *oneTable) { if (strlen(oneTable->childTableName) == 0) { SArray *dst = taosArrayDup(oneTable->tags, NULL); - RandTableName rName = {dst, oneTable->sTableName, (uint8_t)oneTable->sTableNameLen, oneTable->childTableName}; + ASSERT(oneTable->sTableNameLen < TSDB_TABLE_NAME_LEN); + char superName[TSDB_TABLE_NAME_LEN] = {0}; + RandTableName rName = {dst, NULL, (uint8_t)oneTable->sTableNameLen, oneTable->childTableName}; + if(tsSmlDot2Underline){ + memcpy(superName, oneTable->sTableName, oneTable->sTableNameLen); + smlStrReplace(superName, oneTable->sTableNameLen); + rName.stbFullName = superName; + }else{ + rName.stbFullName = oneTable->sTableName; + } buildChildTableName(&rName); taosArrayDestroy(dst); @@ -230,6 +239,9 @@ void getTableUid(SSmlHandle *info, SSmlLineInfo *currElement, SSmlTableInfo *tin char key[TSDB_TABLE_NAME_LEN * 2 + 1] = {0}; size_t nLen = strlen(tinfo->childTableName); memcpy(key, currElement->measure, currElement->measureLen); + if(tsSmlDot2Underline){ + smlStrReplace(key, currElement->measureLen); + } memcpy(key + currElement->measureLen + 1, tinfo->childTableName, nLen); void *uid = taosHashGet(info->tableUids, key, diff --git a/source/client/src/clientTmqConnector.c b/source/client/src/clientTmqConnector.c index 6ec82aa6ef..2bea738c23 100644 --- a/source/client/src/clientTmqConnector.c +++ b/source/client/src/clientTmqConnector.c @@ -291,6 +291,39 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqCommitSync(JNI TAOS_RES *res = (TAOS_RES *)jres; return tmq_commit_sync(tmq, res); } +JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqCommitAllSync(JNIEnv *env, jobject jobj, jlong jtmq) { + tmq_t *tmq = (tmq_t *)jtmq; + if (tmq == NULL) { + jniError("jobj:%p, tmq is closed", jobj); + return TMQ_CONSUMER_NULL; + } + + return tmq_commit_sync(tmq, NULL); +} + +JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqCommitOffsetSyncImp(JNIEnv *env, jobject jobj, + jlong jtmq, jstring jtopic, + jint vgId, jlong offset) { + tmq_t *tmq = (tmq_t *)jtmq; + if (tmq == NULL) { + jniDebug("jobj:%p, tmq is closed", jobj); + return TMQ_CONSUMER_NULL; + } + + if (jtopic == NULL) { + jniDebug("jobj:%p, topic is null", jobj); + return TMQ_TOPIC_NULL; + } + const char *topicName = (*env)->GetStringUTFChars(env, jtopic, NULL); + + int code = tmq_commit_offset_sync(tmq, topicName, vgId, offset); + if (code != TSDB_CODE_SUCCESS) { + jniError("jobj:%p, tmq commit offset error, code:%d, msg:%s", jobj, code, tmq_err2str(code)); + } + + (*env)->ReleaseStringUTFChars(env, jtopic, topicName); + return code; +} // deprecated JNIEXPORT void JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqCommitAsync(JNIEnv *env, jobject jobj, jlong jtmq, @@ -319,6 +352,27 @@ JNIEXPORT void JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_consumerCommitAsy tmq_commit_async(tmq, res, consumer_callback, offset); } +JNIEXPORT void JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_consumerCommitAllAsync(JNIEnv *env, jobject jobj, + jlong jtmq, jobject offset) { + tmqGlobalMethod(env); + tmq_t *tmq = (tmq_t *)jtmq; + + offset = (*env)->NewGlobalRef(env, offset); + tmq_commit_async(tmq, NULL, consumer_callback, offset); +} + +JNIEXPORT void JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_consumerCommitOffsetAsync(JNIEnv *env, jobject jobj, + jlong jtmq, jstring jtopic, + jint vgId, jlong offset, + jobject callback) { + tmqGlobalMethod(env); + tmq_t *tmq = (tmq_t *)jtmq; + const char *topicName = (*env)->GetStringUTFChars(env, jtopic, NULL); + + callback = (*env)->NewGlobalRef(env, callback); + tmq_commit_offset_async(tmq, topicName, vgId, offset, consumer_callback, callback); +} + JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqUnsubscribeImp(JNIEnv *env, jobject jobj, jlong jtmq) { tmq_t *tmq = (tmq_t *)jtmq; @@ -497,9 +551,9 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqGetTopicAssign int32_t res = tmq_get_topic_assignment(tmq, topicName, &pAssign, &numOfAssignment); if (res != TSDB_CODE_SUCCESS) { - (*env)->ReleaseStringUTFChars(env, jtopic, topicName); jniError("jobj:%p, tmq get topic assignment error, topic:%s, code:%d, msg:%s", jobj, topicName, res, tmq_err2str(res)); + (*env)->ReleaseStringUTFChars(env, jtopic, topicName); tmq_free_assignment(pAssign); return (jint)res; } @@ -518,3 +572,55 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqGetTopicAssign tmq_free_assignment(pAssign); return JNI_SUCCESS; } + +JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqCommittedImp(JNIEnv *env, jobject jobj, jlong jtmq, + jstring jtopic, jint vgId) { + tmq_t *tmq = (tmq_t *)jtmq; + if (tmq == NULL) { + jniDebug("jobj:%p, tmq is closed", jobj); + return TMQ_CONSUMER_NULL; + } + + if (jtopic == NULL) { + jniDebug("jobj:%p, topic is null", jobj); + return TMQ_TOPIC_NULL; + } + + const char *topicName = (*env)->GetStringUTFChars(env, jtopic, NULL); + + int64_t offset = tmq_committed(tmq, topicName, vgId); + + if (offset < JNI_SUCCESS && offset != -2147467247) { + jniError("jobj:%p, tmq get committed offset error, topic:%s, vgId:%d, code:0x%" PRIx64 ", msg:%s", jobj, topicName, + vgId, offset, tmq_err2str(offset)); + } + + (*env)->ReleaseStringUTFChars(env, jtopic, topicName); + return (jlong)offset; +} + +JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqPositionImp(JNIEnv *env, jobject jobj, jlong jtmq, + jstring jtopic, jint vgId) { + tmq_t *tmq = (tmq_t *)jtmq; + if (tmq == NULL) { + jniDebug("jobj:%p, tmq is closed", jobj); + return TMQ_CONSUMER_NULL; + } + + if (jtopic == NULL) { + jniDebug("jobj:%p, topic is null", jobj); + return TMQ_TOPIC_NULL; + } + + const char *topicName = (*env)->GetStringUTFChars(env, jtopic, NULL); + + int64_t offset = tmq_position(tmq, topicName, vgId); + + if (offset < JNI_SUCCESS) { + jniError("jobj:%p, tmq get position error, topic:%s, vgId:%d, code:0x%" PRIx64 ", msg:%s", jobj, topicName, vgId, + offset, tmq_err2str(offset)); + } + + (*env)->ReleaseStringUTFChars(env, jtopic, topicName); + return (jlong)offset; +} \ No newline at end of file diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 5188b1e27c..f6fce452ed 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -1340,8 +1340,9 @@ SSDataBlock* blockCopyOneRow(const SSDataBlock* pDataBlock, int32_t rowIdx) { for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, i); SColumnInfoData* pSrc = taosArrayGet(pDataBlock->pDataBlock, i); - void* pData = colDataGetData(pSrc, rowIdx); bool isNull = colDataIsNull(pSrc, pDataBlock->info.rows, rowIdx, NULL); + void* pData = NULL; + if (!isNull) pData = colDataGetData(pSrc, rowIdx); colDataSetVal(pDst, 0, pData, isNull); } diff --git a/source/dnode/mnode/impl/inc/mndConsumer.h b/source/dnode/mnode/impl/inc/mndConsumer.h index a3a31cfc5a..94c937e8f4 100644 --- a/source/dnode/mnode/impl/inc/mndConsumer.h +++ b/source/dnode/mnode/impl/inc/mndConsumer.h @@ -33,7 +33,7 @@ enum { int32_t mndInitConsumer(SMnode *pMnode); void mndCleanupConsumer(SMnode *pMnode); -void mndDropConsumerFromSdb(SMnode *pMnode, int64_t consumerId); +void mndDropConsumerFromSdb(SMnode *pMnode, int64_t consumerId, SRpcHandleInfo* info); SMqConsumerObj *mndAcquireConsumer(SMnode *pMnode, int64_t consumerId); void mndReleaseConsumer(SMnode *pMnode, SMqConsumerObj *pConsumer); diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 2e78d03884..e83235fd74 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -73,7 +73,7 @@ int32_t mndInitConsumer(SMnode *pMnode) { void mndCleanupConsumer(SMnode *pMnode) {} -void mndDropConsumerFromSdb(SMnode *pMnode, int64_t consumerId){ +void mndDropConsumerFromSdb(SMnode *pMnode, int64_t consumerId, SRpcHandleInfo* info){ SMqConsumerClearMsg *pClearMsg = rpcMallocCont(sizeof(SMqConsumerClearMsg)); if (pClearMsg == NULL) { mError("consumer:0x%"PRIx64" failed to clear consumer due to out of memory. alloc size:%d", consumerId, (int32_t)sizeof(SMqConsumerClearMsg)); @@ -82,7 +82,11 @@ void mndDropConsumerFromSdb(SMnode *pMnode, int64_t consumerId){ pClearMsg->consumerId = consumerId; SRpcMsg rpcMsg = { - .msgType = TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, .pCont = pClearMsg, .contLen = sizeof(SMqConsumerClearMsg)}; + .msgType = TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, + .pCont = pClearMsg, + .contLen = sizeof(SMqConsumerClearMsg), + .info = *info, + }; mInfo("consumer:0x%" PRIx64 " drop from sdb", consumerId); tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg); @@ -119,7 +123,7 @@ void mndRebCntDec() { } } -static int32_t validateTopics(STrans *pTrans, const SArray *pTopicList, SMnode *pMnode) { +static int32_t validateTopics(STrans *pTrans, const SArray *pTopicList, SMnode *pMnode, const char *pUser) { int32_t numOfTopics = taosArrayGetSize(pTopicList); for (int32_t i = 0; i < numOfTopics; i++) { @@ -129,6 +133,11 @@ static int32_t validateTopics(STrans *pTrans, const SArray *pTopicList, SMnode * return -1; } + if (mndCheckTopicPrivilege(pMnode, pUser, MND_OPER_SUBSCRIBE, pTopic) != 0) { + mndReleaseTopic(pMnode, pTopic); + return -1; + } + mndTransSetDbName(pTrans, pOneTopic, NULL); if(mndTransCheckConflict(pMnode, pTrans) != 0){ mndReleaseTopic(pMnode, pTopic); @@ -167,7 +176,7 @@ static int32_t mndProcessConsumerRecoverMsg(SRpcMsg *pMsg) { if (pTrans == NULL) { goto FAIL; } - if(validateTopics(pTrans, pConsumer->assignedTopics, pMnode) != 0){ + if(validateTopics(pTrans, pConsumer->assignedTopics, pMnode, pMsg->info.conn.user) != 0){ goto FAIL; } @@ -205,7 +214,7 @@ static int32_t mndProcessConsumerClearMsg(SRpcMsg *pMsg) { STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_TOPIC, pMsg, "clear-csm"); if (pTrans == NULL) goto FAIL; - if (validateTopics(pTrans, pConsumer->assignedTopics, pMnode) != 0){ + if (validateTopics(pTrans, pConsumer->assignedTopics, pMnode, pMsg->info.conn.user) != 0){ goto FAIL; } // this is the drop action, not the update action @@ -292,7 +301,7 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) { if (status == MQ_CONSUMER_STATUS_READY) { if (taosArrayGetSize(pConsumer->assignedTopics) == 0) { // unsubscribe or close - mndDropConsumerFromSdb(pMnode, pConsumer->consumerId); + mndDropConsumerFromSdb(pMnode, pConsumer->consumerId, &pMsg->info); } else if (hbStatus > MND_CONSUMER_LOST_HB_CNT) { taosRLockLatch(&pConsumer->lock); int32_t topicNum = taosArrayGetSize(pConsumer->currentTopics); @@ -307,7 +316,7 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) { } } else if (status == MQ_CONSUMER_STATUS_LOST) { if (hbStatus > MND_CONSUMER_LOST_CLEAR_THRESHOLD) { // clear consumer if lost a day - mndDropConsumerFromSdb(pMnode, pConsumer->consumerId); + mndDropConsumerFromSdb(pMnode, pConsumer->consumerId, &pMsg->info); } } else { // MQ_CONSUMER_STATUS_REBALANCE taosRLockLatch(&pConsumer->lock); @@ -384,6 +393,7 @@ static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) { .msgType = TDMT_MND_TMQ_CONSUMER_RECOVER, .pCont = pRecoverMsg, .contLen = sizeof(SMqConsumerRecoverMsg), + .info = pMsg->info, }; tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &pRpcMsg); } @@ -458,6 +468,7 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) { .msgType = TDMT_MND_TMQ_CONSUMER_RECOVER, .pCont = pRecoverMsg, .contLen = sizeof(SMqConsumerRecoverMsg), + .info = pMsg->info, }; tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &pRpcMsg); @@ -646,7 +657,7 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { goto _over; } - code = validateTopics(pTrans, pTopicList, pMnode); + code = validateTopics(pTrans, pTopicList, pMnode, pMsg->info.conn.user); if (code != TSDB_CODE_SUCCESS) { goto _over; } diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index a0d53ec780..716d00bcaa 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -1564,6 +1564,7 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) { } if (pStream->status != STREAM_STATUS__PAUSE) { + sdbRelease(pMnode->pSdb, pStream); return 0; } diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index b4145ae8d0..b95beb32c7 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -824,7 +824,7 @@ static int32_t mndProcessDropCgroupReq(SRpcMsg *pMsg) { } if (strcmp(dropReq.cgroup, pConsumer->cgroup) == 0) { - mndDropConsumerFromSdb(pMnode, pConsumer->consumerId); + mndDropConsumerFromSdb(pMnode, pConsumer->consumerId, &pMsg->info); } sdbRelease(pMnode->pSdb, pConsumer); } diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c index eacdaa1665..3b7d097645 100644 --- a/source/dnode/mnode/impl/src/mndTopic.c +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -386,6 +386,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq * pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_TOPIC, pReq, "create-topic"); if (pTrans == NULL) { mError("topic:%s, failed to create since %s", pCreate->name, terrstr()); + code = -1; goto _OUT; } @@ -400,7 +401,8 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq * tstrncpy(topicObj.db, pDb->name, TSDB_DB_FNAME_LEN); tstrncpy(topicObj.createUser, userName, TSDB_USER_LEN); - if (mndCheckTopicPrivilege(pMnode, pReq->info.conn.user, MND_OPER_CREATE_TOPIC, &topicObj) != 0) { + code = mndCheckTopicPrivilege(pMnode, pReq->info.conn.user, MND_OPER_CREATE_TOPIC, &topicObj); + if (code != 0) { goto _OUT; } @@ -418,6 +420,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq * if (pCreate->withMeta) { terrno = TSDB_CODE_MND_INVALID_TOPIC_OPTION; mError("topic:%s, failed to create since %s", pCreate->name, terrstr()); + code = terrno; goto _OUT; } @@ -426,13 +429,15 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq * qDebugL("topic:%s ast %s", topicObj.name, topicObj.ast); - if (nodesStringToNode(pCreate->ast, &pAst) != 0) { + code = nodesStringToNode(pCreate->ast, &pAst); + if (code != 0) { mError("topic:%s, failed to create since %s", pCreate->name, terrstr()); goto _OUT; } SPlanContext cxt = {.pAstRoot = pAst, .topicQuery = true}; - if (qCreateQueryPlan(&cxt, &pPlan, NULL) != 0) { + code = qCreateQueryPlan(&cxt, &pPlan, NULL); + if (code != 0) { mError("failed to create topic:%s since %s", pCreate->name, terrstr()); goto _OUT; } @@ -440,6 +445,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq * topicObj.ntbColIds = taosArrayInit(0, sizeof(int16_t)); if (topicObj.ntbColIds == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; + code = terrno; goto _OUT; } @@ -450,12 +456,14 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq * topicObj.ntbColIds = NULL; } - if (qExtractResultSchema(pAst, &topicObj.schema.nCols, &topicObj.schema.pSchema) != 0) { + code = qExtractResultSchema(pAst, &topicObj.schema.nCols, &topicObj.schema.pSchema); + if (code != 0) { mError("topic:%s, failed to create since %s", pCreate->name, terrstr()); goto _OUT; } - if (nodesNodeToString((SNode *)pPlan, false, &topicObj.physicalPlan, NULL) != 0) { + code = nodesNodeToString((SNode *)pPlan, false, &topicObj.physicalPlan, NULL); + if (code != 0) { mError("topic:%s, failed to create since %s", pCreate->name, terrstr()); goto _OUT; } @@ -463,6 +471,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq * SStbObj *pStb = mndAcquireStb(pMnode, pCreate->subStbName); if (pStb == NULL) { terrno = TSDB_CODE_MND_STB_NOT_EXIST; + code = terrno; goto _OUT; } @@ -485,6 +494,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq * SSdbRaw *pCommitRaw = mndTopicActionEncode(&topicObj); if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) { mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr()); + code = -1; goto _OUT; } @@ -510,7 +520,6 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq * // encoder check alter info int32_t len; - int32_t code; tEncodeSize(tEncodeSTqCheckInfo, &info, len, code); if (code < 0) { sdbRelease(pSdb, pVgroup); @@ -525,6 +534,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq * taosMemoryFree(buf); sdbRelease(pSdb, pVgroup); sdbCancelFetch(pSdb, pIter); + code = -1; goto _OUT; } tEncoderClear(&encoder); @@ -539,6 +549,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq * taosMemoryFree(buf); sdbRelease(pSdb, pVgroup); sdbCancelFetch(pSdb, pIter); + code = -1; goto _OUT; } buf = NULL; @@ -548,6 +559,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq * if (mndTransPrepare(pMnode, pTrans) != 0) { mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); + code = -1; goto _OUT; } @@ -723,7 +735,7 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) { } if (found){ if (pConsumer->status == MQ_CONSUMER_STATUS_LOST) { - mndDropConsumerFromSdb(pMnode, pConsumer->consumerId); + mndDropConsumerFromSdb(pMnode, pConsumer->consumerId, &pReq->info); mndReleaseConsumer(pMnode, pConsumer); continue; } @@ -769,7 +781,7 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) { } code = mndDropSubByTopic(pMnode, pTrans, dropReq.name); - if ( code < 0) { + if (code < 0) { mError("topic:%s, failed to drop since %s", pTopic->name, terrstr()); goto end; } diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 43f38ade97..04e3e8c0df 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -296,53 +296,70 @@ int32_t tqReaderSeek(STqReader* pReader, int64_t ver, const char* id) { } int32_t extractMsgFromWal(SWalReader* pReader, void** pItem, int64_t maxVer, const char* id) { - int32_t code = walNextValidMsg(pReader); - if (code != TSDB_CODE_SUCCESS) { + int32_t code = 0; + + while(1) { + code = walNextValidMsg(pReader); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + + SWalCont* pCont = &pReader->pHead->head; + int64_t ver = pCont->version; + if (ver > maxVer) { + tqDebug("maxVer in WAL:%" PRId64 " reached current:%" PRId64 ", do not scan wal anymore, %s", maxVer, ver, id); + return TSDB_CODE_SUCCESS; + } + + if (pCont->msgType == TDMT_VND_SUBMIT) { + void* pBody = POINTER_SHIFT(pCont->body, sizeof(SSubmitReq2Msg)); + int32_t len = pCont->bodyLen - sizeof(SSubmitReq2Msg); + + void* data = taosMemoryMalloc(len); + if (data == NULL) { + // todo: for all stream in this vnode, keep this offset in the offset files, and wait for a moment, and then retry + code = TSDB_CODE_OUT_OF_MEMORY; + terrno = code; + + tqError("vgId:%d, failed to copy submit data for stream processing, since out of memory", 0); + return code; + } + + memcpy(data, pBody, len); + SPackedData data1 = (SPackedData){.ver = ver, .msgLen = len, .msgStr = data}; + + *pItem = (SStreamQueueItem*)streamDataSubmitNew(&data1, STREAM_INPUT__DATA_SUBMIT); + if (*pItem == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + terrno = code; + tqError("%s failed to create data submit for stream since out of memory", id); + return code; + } + } else if (pCont->msgType == TDMT_VND_DELETE) { + void* pBody = POINTER_SHIFT(pCont->body, sizeof(SMsgHead)); + int32_t len = pCont->bodyLen - sizeof(SMsgHead); + + code = extractDelDataBlock(pBody, len, ver, (SStreamRefDataBlock**)pItem); + if (code == TSDB_CODE_SUCCESS) { + if (*pItem == NULL) { + tqDebug("s-task:%s empty delete msg, discard it, len:%d, ver:%" PRId64, id, len, ver); + // we need to continue check next data in the wal files. + continue; + } else { + tqDebug("s-task:%s delete msg extract from WAL, len:%d, ver:%" PRId64, id, len, ver); + } + } else { + terrno = code; + tqError("s-task:%s extract delete msg from WAL failed, code:%s", id, tstrerror(code)); + return code; + } + + } else { + ASSERT(0); + } + return code; } - - int64_t ver = pReader->pHead->head.version; - if (ver > maxVer) { - tqDebug("maxVer in WAL:%"PRId64" reached current:%"PRId64", do not scan wal anymore, %s", maxVer, ver, id); - return TSDB_CODE_SUCCESS; - } - - if (pReader->pHead->head.msgType == TDMT_VND_SUBMIT) { - void* pBody = POINTER_SHIFT(pReader->pHead->head.body, sizeof(SSubmitReq2Msg)); - int32_t len = pReader->pHead->head.bodyLen - sizeof(SSubmitReq2Msg); - - void* data = taosMemoryMalloc(len); - if (data == NULL) { - // todo: for all stream in this vnode, keep this offset in the offset files, and wait for a moment, and then retry - terrno = TSDB_CODE_OUT_OF_MEMORY; - tqError("vgId:%d, failed to copy submit data for stream processing, since out of memory", 0); - return -1; - } - - memcpy(data, pBody, len); - SPackedData data1 = (SPackedData){.ver = ver, .msgLen = len, .msgStr = data}; - - *pItem = (SStreamQueueItem*)streamDataSubmitNew(&data1, STREAM_INPUT__DATA_SUBMIT); - if (*pItem == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - tqError("%s failed to create data submit for stream since out of memory", id); - return terrno; - } - } else if (pReader->pHead->head.msgType == TDMT_VND_DELETE) { - void* pBody = POINTER_SHIFT(pReader->pHead->head.body, sizeof(SMsgHead)); - int32_t len = pReader->pHead->head.bodyLen - sizeof(SMsgHead); - - code = extractDelDataBlock(pBody, len, ver, (SStreamRefDataBlock**)pItem); - if (code != TSDB_CODE_SUCCESS) { - tqError("s-task:%s extract delete msg from WAL failed, code:%s", id, tstrerror(code)); - } else { - tqDebug("s-task:%s delete msg extract from WAL, len:%d, ver:%"PRId64, id, len, ver); - } - } else { - ASSERT(0); - } - - return 0; } // todo ignore the error in wal? diff --git a/source/libs/executor/src/filloperator.c b/source/libs/executor/src/filloperator.c index be4cb8d2dc..9b0b43b6c4 100644 --- a/source/libs/executor/src/filloperator.c +++ b/source/libs/executor/src/filloperator.c @@ -177,6 +177,7 @@ static void revisedFillStartKey(SFillOperatorInfo* pInfo, SSDataBlock* pBlock, i } // todo time window chosen problem: t or prev value? + if (t > pInfo->pFillInfo->start) t -= pInterval->sliding; taosFillUpdateStartTimestampInfo(pInfo->pFillInfo, t); } } @@ -838,6 +839,7 @@ void setFillValueInfo(SSDataBlock* pBlock, TSKEY ts, int32_t rowId, SStreamFillS (pFillSup->next.key == pFillInfo->nextRowKey && !hasPrevWindow(pFillSup)))) { setFillKeyInfo(ts, nextWKey, &pFillSup->interval, pFillInfo); pFillInfo->pos = FILL_POS_START; + resetFillWindow(&pFillSup->prev); pFillSup->prev.key = pFillSup->cur.key; pFillSup->prev.pRowVal = pFillSup->cur.pRowVal; } else if (hasPrevWindow(pFillSup)) { @@ -851,6 +853,7 @@ void setFillValueInfo(SSDataBlock* pBlock, TSKEY ts, int32_t rowId, SStreamFillS if (hasPrevWindow(pFillSup)) { setFillKeyInfo(prevWKey, ts, &pFillSup->interval, pFillInfo); pFillInfo->pos = FILL_POS_END; + resetFillWindow(&pFillSup->next); pFillSup->next.key = pFillSup->cur.key; pFillSup->next.pRowVal = pFillSup->cur.pRowVal; pFillInfo->preRowKey = INT64_MIN; @@ -1229,8 +1232,6 @@ static void doDeleteFillResult(SOperatorInfo* pOperator) { SWinKey nextKey = {.groupId = groupId, .ts = ts}; while (pInfo->srcDelRowIndex < pBlock->info.rows) { - void* nextVal = NULL; - int32_t nextLen = 0; TSKEY delTs = tsStarts[pInfo->srcDelRowIndex]; uint64_t delGroupId = groupIds[pInfo->srcDelRowIndex]; int32_t code = TSDB_CODE_SUCCESS; @@ -1245,7 +1246,7 @@ static void doDeleteFillResult(SOperatorInfo* pOperator) { if (delTs == nextKey.ts) { code = pAPI->stateStore.streamStateCurNext(pOperator->pTaskInfo->streamInfo.pState, pCur); if (code == TSDB_CODE_SUCCESS) { - code = pAPI->stateStore.streamStateGetGroupKVByCur(pCur, &nextKey, (const void**)&nextVal, &nextLen); + code = pAPI->stateStore.streamStateGetGroupKVByCur(pCur, &nextKey, NULL, NULL); } // ts will be deleted later if (delTs != ts) { diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 9228c923a6..7d0fafff73 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -972,7 +972,8 @@ static SSDataBlock* buildStreamPartitionResult(SOperatorInfo* pOperator) { SColumnInfoData* pSrcCol = taosArrayGet(pSrc->pDataBlock, slotId); SColumnInfoData* pDestCol = taosArrayGet(pDest->pDataBlock, j); bool isNull = colDataIsNull(pSrcCol, pSrc->info.rows, rowIndex, NULL); - char* pSrcData = colDataGetData(pSrcCol, rowIndex); + char* pSrcData = NULL; + if (!isNull) pSrcData = colDataGetData(pSrcCol, rowIndex); colDataSetVal(pDestCol, pDest->info.rows, pSrcData, isNull); } pDest->info.rows++; diff --git a/source/libs/executor/src/projectoperator.c b/source/libs/executor/src/projectoperator.c index 1cc377b3ee..7266fb461d 100644 --- a/source/libs/executor/src/projectoperator.c +++ b/source/libs/executor/src/projectoperator.c @@ -372,6 +372,10 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0; } + if (pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM) { + printDataBlock(p, "project"); + } + return (p->info.rows > 0) ? p : NULL; } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index da4bd1e23c..af1740750c 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1247,7 +1247,8 @@ static SSDataBlock* doRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pSDB, int32 SColumnInfoData* pSrcCol = taosArrayGet(tmpBlock->pDataBlock, j); SColumnInfoData* pDestCol = taosArrayGet(pResult->pDataBlock, j); bool isNull = colDataIsNull(pSrcCol, tmpBlock->info.rows, i, NULL); - char* pSrcData = colDataGetData(pSrcCol, i); + char* pSrcData = NULL; + if (!isNull) pSrcData = colDataGetData(pSrcCol, i); colDataSetVal(pDestCol, pResult->info.rows, pSrcData, isNull); } pResult->info.rows++; diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 4f793d7064..ca47cee95c 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -2905,6 +2905,7 @@ void destroyStreamSessionAggOperatorInfo(void* param) { SStreamSessionAggOperatorInfo* pInfo = (SStreamSessionAggOperatorInfo*)param; cleanupBasicInfo(&pInfo->binfo); destroyStreamAggSupporter(&pInfo->streamAggSup); + cleanupExprSupp(&pInfo->scalarSupp); if (pInfo->pChildren != NULL) { int32_t size = taosArrayGetSize(pInfo->pChildren); @@ -2921,6 +2922,7 @@ void destroyStreamSessionAggOperatorInfo(void* param) { blockDataDestroy(pInfo->pUpdateRes); tSimpleHashCleanup(pInfo->pStUpdated); tSimpleHashCleanup(pInfo->pStDeleted); + pInfo->pUpdated = taosArrayDestroy(pInfo->pUpdated); taosArrayDestroy(pInfo->historyWins); taosMemoryFreeClear(param); @@ -3240,6 +3242,31 @@ static int32_t compactSessionWindow(SOperatorInfo* pOperator, SResultWindowInfo* return winNum; } +static void compactSessionSemiWindow(SOperatorInfo* pOperator, SResultWindowInfo* pCurWin) { + SExprSupp* pSup = &pOperator->exprSupp; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI; + SStreamSessionAggOperatorInfo* pInfo = pOperator->info; + SResultRow* pCurResult = NULL; + int32_t numOfOutput = pOperator->exprSupp.numOfExprs; + SStreamAggSupporter* pAggSup = &pInfo->streamAggSup; + // Just look for the window behind StartIndex + while (1) { + SResultWindowInfo winInfo = {0}; + SStreamStateCur* pCur = getNextSessionWinInfo(pAggSup, NULL, pCurWin, &winInfo); + if (!IS_VALID_SESSION_WIN(winInfo) || !isInWindow(pCurWin, winInfo.sessionWin.win.skey, pAggSup->gap) || + !inWinRange(&pAggSup->winRange, &winInfo.sessionWin.win)) { + taosMemoryFree(winInfo.pOutputBuf); + pAPI->stateStore.streamStateFreeCur(pCur); + break; + } + pCurWin->sessionWin.win.ekey = TMAX(pCurWin->sessionWin.win.ekey, winInfo.sessionWin.win.ekey); + doDeleteSessionWindow(pAggSup, &winInfo.sessionWin); + pAPI->stateStore.streamStateFreeCur(pCur); + taosMemoryFree(winInfo.pOutputBuf); + } +} + int32_t saveSessionOutputBuf(SStreamAggSupporter* pAggSup, SResultWindowInfo* pWinInfo) { saveSessionDiscBuf(pAggSup->pState, &pWinInfo->sessionWin, pWinInfo->pOutputBuf, pAggSup->resultRowSize, &pAggSup->stateStore); pWinInfo->pOutputBuf = NULL; @@ -3417,9 +3444,9 @@ void doBuildDeleteDataBlock(SOperatorInfo* pOp, SSHashObj* pStDeleted, SSDataBlo } static void rebuildSessionWindow(SOperatorInfo* pOperator, SArray* pWinArray, SSHashObj* pStUpdated) { - SExprSupp* pSup = &pOperator->exprSupp; - SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI; + SExprSupp* pSup = &pOperator->exprSupp; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI; int32_t size = taosArrayGetSize(pWinArray); SStreamSessionAggOperatorInfo* pInfo = pOperator->info; @@ -3446,6 +3473,7 @@ static void rebuildSessionWindow(SOperatorInfo* pOperator, SArray* pWinArray, SS int32_t code = getSessionWinBuf(pChAggSup, pCur, &childWin); if (code == TSDB_CODE_SUCCESS && !inWinRange(&pAggSup->winRange, &childWin.sessionWin.win)) { + releaseOutputBuf(pAggSup->pState, NULL, (SResultRow*)childWin.pOutputBuf, &pAggSup->stateStore); continue; } @@ -3454,6 +3482,7 @@ static void rebuildSessionWindow(SOperatorInfo* pOperator, SArray* pWinArray, SS setSessionOutputBuf(pAggSup, pWinKey->win.skey, pWinKey->win.ekey, pWinKey->groupId, &parentWin); code = initSessionOutputBuf(&parentWin, &pResult, pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset); if (code != TSDB_CODE_SUCCESS || pResult == NULL) { + releaseOutputBuf(pAggSup->pState, NULL, (SResultRow*)childWin.pOutputBuf, &pAggSup->stateStore); break; } } @@ -3464,7 +3493,9 @@ static void rebuildSessionWindow(SOperatorInfo* pOperator, SArray* pWinArray, SS compactFunctions(pSup->pCtx, pChild->exprSupp.pCtx, numOfOutput, pTaskInfo, &pInfo->twAggSup.timeWindowData); compactSessionWindow(pOperator, &parentWin, pStUpdated, NULL, true); saveResult(parentWin, pStUpdated); + releaseOutputBuf(pAggSup->pState, NULL, (SResultRow*)childWin.pOutputBuf, &pAggSup->stateStore); } else { + releaseOutputBuf(pAggSup->pState, NULL, (SResultRow*)childWin.pOutputBuf, &pAggSup->stateStore); break; } } @@ -3703,11 +3734,11 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { } void streamSessionReleaseState(SOperatorInfo* pOperator) { - if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION) { - SStreamSessionAggOperatorInfo* pInfo = pOperator->info; - int32_t resSize = taosArrayGetSize(pInfo->historyWins) * sizeof(SSessionKey); - pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_SESSION_OP_STATE_NAME, strlen(STREAM_SESSION_OP_STATE_NAME), pInfo->historyWins->pData, resSize); - } + SStreamSessionAggOperatorInfo* pInfo = pOperator->info; + int32_t resSize = taosArrayGetSize(pInfo->historyWins) * sizeof(SSessionKey); + pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_SESSION_OP_STATE_NAME, + strlen(STREAM_SESSION_OP_STATE_NAME), pInfo->historyWins->pData, + resSize); SOperatorInfo* downstream = pOperator->pDownstream[0]; if (downstream->fpSet.releaseStreamStateFn) { downstream->fpSet.releaseStreamStateFn(downstream); @@ -3719,6 +3750,33 @@ void resetWinRange(STimeWindow* winRange) { winRange->ekey = INT64_MAX; } +void streamSessionSemiReloadState(SOperatorInfo* pOperator) { + SStreamSessionAggOperatorInfo* pInfo = pOperator->info; + SStreamAggSupporter* pAggSup = &pInfo->streamAggSup; + resetWinRange(&pAggSup->winRange); + + SResultWindowInfo winInfo = {0}; + int32_t size = 0; + void* pBuf = NULL; + int32_t code = pAggSup->stateStore.streamStateGetInfo(pAggSup->pState, STREAM_SESSION_OP_STATE_NAME, + strlen(STREAM_SESSION_OP_STATE_NAME), &pBuf, &size); + int32_t num = size / sizeof(SSessionKey); + SSessionKey* pSeKeyBuf = (SSessionKey*) pBuf; + ASSERT(size == num * sizeof(SSessionKey)); + for (int32_t i = 0; i < num; i++) { + SResultWindowInfo winInfo = {0}; + setSessionOutputBuf(pAggSup, pSeKeyBuf[i].win.skey, pSeKeyBuf[i].win.ekey, pSeKeyBuf[i].groupId, &winInfo); + compactSessionSemiWindow(pOperator, &winInfo); + saveSessionOutputBuf(pAggSup, &winInfo); + } + taosMemoryFree(pBuf); + + SOperatorInfo* downstream = pOperator->pDownstream[0]; + if (downstream->fpSet.reloadStreamStateFn) { + downstream->fpSet.reloadStreamStateFn(downstream); + } +} + void streamSessionReloadState(SOperatorInfo* pOperator) { SStreamSessionAggOperatorInfo* pInfo = pOperator->info; SStreamAggSupporter* pAggSup = &pInfo->streamAggSup; @@ -3948,6 +4006,11 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) { removeSessionResults(pInfo->pStDeleted, pInfo->pUpdated); tSimpleHashCleanup(pInfo->pStUpdated); pInfo->pStUpdated = NULL; + + if(pInfo->isHistoryOp) { + getMaxTsWins(pInfo->pUpdated, pInfo->historyWins); + } + initGroupResInfoFromArrayList(&pInfo->groupResInfo, pInfo->pUpdated); pInfo->pUpdated = NULL; blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity); @@ -3996,8 +4059,8 @@ SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream blockDataEnsureCapacity(pInfo->pUpdateRes, 128); pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamSessionSemiAgg, NULL, destroyStreamSessionAggOperatorInfo, optrDefaultBufFn, NULL); + setOperatorStreamStateFn(pOperator, streamSessionReleaseState, streamSessionSemiReloadState); } - setOperatorStreamStateFn(pOperator, streamSessionReleaseState, streamSessionReloadState); setOperatorInfo(pOperator, name, pPhyNode->type, false, OP_NOT_OPENED, pInfo, pTaskInfo); pOperator->operatorType = pPhyNode->type; @@ -4035,6 +4098,7 @@ void destroyStreamStateOperatorInfo(void* param) { cleanupBasicInfo(&pInfo->binfo); destroyStreamAggSupporter(&pInfo->streamAggSup); cleanupGroupResInfo(&pInfo->groupResInfo); + cleanupExprSupp(&pInfo->scalarSupp); if (pInfo->pChildren != NULL) { int32_t size = taosArrayGetSize(pInfo->pChildren); for (int32_t i = 0; i < size; i++) { @@ -4048,6 +4112,7 @@ void destroyStreamStateOperatorInfo(void* param) { taosArrayDestroy(pInfo->historyWins); tSimpleHashCleanup(pInfo->pSeUpdated); tSimpleHashCleanup(pInfo->pSeDeleted); + pInfo->pUpdated = taosArrayDestroy(pInfo->pUpdated); taosMemoryFreeClear(param); } diff --git a/source/libs/parser/inc/sql.y b/source/libs/parser/inc/sql.y index 6c3f589159..43e75e5c5a 100755 --- a/source/libs/parser/inc/sql.y +++ b/source/libs/parser/inc/sql.y @@ -286,7 +286,7 @@ retention(A) ::= NK_VARIABLE(B) NK_COLON NK_VARIABLE(C). %type speed_opt { int32_t } %destructor speed_opt { } speed_opt(A) ::= . { A = 0; } -speed_opt(A) ::= MAX_SPEED NK_INTEGER(B). { A = taosStr2Int32(B.z, NULL, 10); } +speed_opt(A) ::= BWLIMIT NK_INTEGER(B). { A = taosStr2Int32(B.z, NULL, 10); } start_opt(A) ::= . { A = NULL; } start_opt(A) ::= START WITH NK_INTEGER(B). { A = createValueNode(pCxt, TSDB_DATA_TYPE_BIGINT, &B); } diff --git a/source/libs/parser/src/parTokenizer.c b/source/libs/parser/src/parTokenizer.c index ca7ac1a0b6..df01fe5fc8 100644 --- a/source/libs/parser/src/parTokenizer.c +++ b/source/libs/parser/src/parTokenizer.c @@ -137,7 +137,7 @@ static SKeyword keywordTable[] = { {"MATCH", TK_MATCH}, {"MAXROWS", TK_MAXROWS}, {"MAX_DELAY", TK_MAX_DELAY}, - {"MAX_SPEED", TK_MAX_SPEED}, + {"BWLIMIT", TK_BWLIMIT}, {"MERGE", TK_MERGE}, {"META", TK_META}, {"ONLY", TK_ONLY}, diff --git a/source/libs/parser/src/sql.c b/source/libs/parser/src/sql.c index a912fb4e71..755102395a 100644 --- a/source/libs/parser/src/sql.c +++ b/source/libs/parser/src/sql.c @@ -1150,7 +1150,7 @@ static const YYCODETYPE yyFallback[] = { 0, /* TABLE_PREFIX => nothing */ 0, /* TABLE_SUFFIX => nothing */ 0, /* NK_COLON => nothing */ - 0, /* MAX_SPEED => nothing */ + 0, /* BWLIMIT => nothing */ 0, /* START => nothing */ 0, /* TIMESTAMP => nothing */ 287, /* END => ABORT */ @@ -1575,7 +1575,7 @@ static const char *const yyTokenName[] = { /* 95 */ "TABLE_PREFIX", /* 96 */ "TABLE_SUFFIX", /* 97 */ "NK_COLON", - /* 98 */ "MAX_SPEED", + /* 98 */ "BWLIMIT", /* 99 */ "START", /* 100 */ "TIMESTAMP", /* 101 */ "END", @@ -2114,7 +2114,7 @@ static const char *const yyRuleName[] = { /* 140 */ "retention_list ::= retention_list NK_COMMA retention", /* 141 */ "retention ::= NK_VARIABLE NK_COLON NK_VARIABLE", /* 142 */ "speed_opt ::=", - /* 143 */ "speed_opt ::= MAX_SPEED NK_INTEGER", + /* 143 */ "speed_opt ::= BWLIMIT NK_INTEGER", /* 144 */ "start_opt ::=", /* 145 */ "start_opt ::= START WITH NK_INTEGER", /* 146 */ "start_opt ::= START WITH NK_STRING", @@ -3335,7 +3335,7 @@ static const YYCODETYPE yyRuleInfoLhs[] = { 366, /* (140) retention_list ::= retention_list NK_COMMA retention */ 369, /* (141) retention ::= NK_VARIABLE NK_COLON NK_VARIABLE */ 361, /* (142) speed_opt ::= */ - 361, /* (143) speed_opt ::= MAX_SPEED NK_INTEGER */ + 361, /* (143) speed_opt ::= BWLIMIT NK_INTEGER */ 362, /* (144) start_opt ::= */ 362, /* (145) start_opt ::= START WITH NK_INTEGER */ 362, /* (146) start_opt ::= START WITH NK_STRING */ @@ -3940,7 +3940,7 @@ static const signed char yyRuleInfoNRhs[] = { -3, /* (140) retention_list ::= retention_list NK_COMMA retention */ -3, /* (141) retention ::= NK_VARIABLE NK_COLON NK_VARIABLE */ 0, /* (142) speed_opt ::= */ - -2, /* (143) speed_opt ::= MAX_SPEED NK_INTEGER */ + -2, /* (143) speed_opt ::= BWLIMIT NK_INTEGER */ 0, /* (144) start_opt ::= */ -3, /* (145) start_opt ::= START WITH NK_INTEGER */ -3, /* (146) start_opt ::= START WITH NK_STRING */ @@ -5016,7 +5016,7 @@ static YYACTIONTYPE yy_reduce( case 330: /* bufsize_opt ::= */ yytestcase(yyruleno==330); { yymsp[1].minor.yy416 = 0; } break; - case 143: /* speed_opt ::= MAX_SPEED NK_INTEGER */ + case 143: /* speed_opt ::= BWLIMIT NK_INTEGER */ case 331: /* bufsize_opt ::= BUFSIZE NK_INTEGER */ yytestcase(yyruleno==331); { yymsp[-1].minor.yy416 = taosStr2Int32(yymsp[0].minor.yy0.z, NULL, 10); } break; diff --git a/source/libs/parser/test/parShowToUse.cpp b/source/libs/parser/test/parShowToUse.cpp index b7bd0e802c..4396f786ff 100644 --- a/source/libs/parser/test/parShowToUse.cpp +++ b/source/libs/parser/test/parShowToUse.cpp @@ -286,7 +286,7 @@ TEST_F(ParserShowToUseTest, trimDatabase) { run("TRIM DATABASE wxy_db"); setTrimDbReq("wxy_db", 100); - run("TRIM DATABASE wxy_db MAX_SPEED 100"); + run("TRIM DATABASE wxy_db BWLIMIT 100"); } TEST_F(ParserShowToUseTest, useDatabase) { diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 4a0ce81e68..571aca9935 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -1613,6 +1613,9 @@ int32_t streamStateSessionGetKVByCur_rocksdb(SStreamStateCur* pCur, SSessionKey* const char* curKey = rocksdb_iter_key(pCur->iter, (size_t*)&kLen); stateSessionKeyDecode((void*)&ktmp, (char*)curKey); + if (pVal != NULL) *pVal = NULL; + if (pVLen != NULL) *pVLen = 0; + SStateSessionKey* pKTmp = &ktmp; const char* vval = rocksdb_iter_value(pCur->iter, (size_t*)&vLen); char* val = NULL; @@ -1620,19 +1623,23 @@ int32_t streamStateSessionGetKVByCur_rocksdb(SStreamStateCur* pCur, SSessionKey* if (len < 0) { return -1; } + + if (pKTmp->opNum != pCur->number) { + taosMemoryFree(val); + return -1; + } + if (pKey->groupId != 0 && pKey->groupId != pKTmp->key.groupId) { + taosMemoryFree(val); + return -1; + } + if (pVal != NULL) { *pVal = (char*)val; } else { taosMemoryFree(val); } - if (pVLen != NULL) *pVLen = len; - if (pKTmp->opNum != pCur->number) { - return -1; - } - if (pKey->groupId != 0 && pKey->groupId != pKTmp->key.groupId) { - return -1; - } + if (pVLen != NULL) *pVLen = len; *pKey = pKTmp->key; return 0; } diff --git a/source/os/src/osFile.c b/source/os/src/osFile.c index dd670595f0..ede1f1fb0e 100644 --- a/source/os/src/osFile.c +++ b/source/os/src/osFile.c @@ -885,13 +885,16 @@ int32_t taosCompressFile(char *srcFileName, char *destFileName) { char *data = taosMemoryMalloc(compressSize); gzFile dstFp = NULL; - TdFilePtr pSrcFile = taosOpenFile(srcFileName, TD_FILE_READ | TD_FILE_STREAM); + TdFilePtr pFile = NULL; + TdFilePtr pSrcFile = NULL; + + pSrcFile = taosOpenFile(srcFileName, TD_FILE_READ | TD_FILE_STREAM); if (pSrcFile == NULL) { ret = -1; goto cmp_end; } - TdFilePtr pFile = taosOpenFile(destFileName, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC); + pFile = taosOpenFile(destFileName, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC); if (pFile == NULL) { ret = -2; goto cmp_end; @@ -910,6 +913,9 @@ int32_t taosCompressFile(char *srcFileName, char *destFileName) { } cmp_end: + if (pFile) { + taosCloseFile(&pFile); + } if (pSrcFile) { taosCloseFile(&pSrcFile); } diff --git a/tests/system-test/2-query/fill_with_group.py b/tests/system-test/2-query/fill_with_group.py index c1ea9877a2..393102c8ed 100644 --- a/tests/system-test/2-query/fill_with_group.py +++ b/tests/system-test/2-query/fill_with_group.py @@ -130,9 +130,18 @@ class TDTestCase: for j in range(0,60): tdSql.checkData(i*1500+j, 1, None) + def test_fill_with_order_by(self): + sql = "select _wstart, _wend, count(ts), sum(c1) from meters where ts > '2018-11-25 00:00:00.000' and ts < '2018-11-26 00:00:00.00' interval(1d) fill(NULL) order by _wstart" + tdSql.query(sql) + tdSql.checkRows(1) + sql = "select _wstart, _wend, count(ts), sum(c1) from meters where ts > '2018-11-25 00:00:00.000' and ts < '2018-11-26 00:00:00.00' interval(1d) fill(NULL) order by _wstart desc" + tdSql.query(sql) + tdSql.checkRows(1) + def run(self): self.prepareTestEnv() self.test_partition_by_with_interval_fill_prev_new_group_fill_error() + self.test_fill_with_order_by() def stop(self): tdSql.close() diff --git a/tests/system-test/2-query/sml_TS-3724.py b/tests/system-test/2-query/sml_TS-3724.py index a8b16c4662..410e266f10 100644 --- a/tests/system-test/2-query/sml_TS-3724.py +++ b/tests/system-test/2-query/sml_TS-3724.py @@ -67,7 +67,7 @@ class TDTestCase: tdSql.query(f"select distinct tbname from {dbname}.`sys_if_bytes_out`") tdSql.checkRows(2) - tdSql.query(f"select * from {dbname}.t_fc70dec6677d4277c5d9799c4da806da order by times") + tdSql.query(f"select * from {dbname}.t_f67972b49aa8adf8bca5d0d54f0d850d order by times") tdSql.checkRows(2) tdSql.checkData(0, 1, 1.300000000) tdSql.checkData(1, 1, 13.000000000) diff --git a/tests/system-test/7-tmq/subscribeDb0.py b/tests/system-test/7-tmq/subscribeDb0.py index ed13fcbe06..d4dfe425dc 100644 --- a/tests/system-test/7-tmq/subscribeDb0.py +++ b/tests/system-test/7-tmq/subscribeDb0.py @@ -237,7 +237,7 @@ class TDTestCase: for i in range(expectRows): totalConsumeRows += resultList[i] - if totalConsumeRows != expectrowcnt: + if totalConsumeRows < expectrowcnt: tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt)) tdLog.exit("tmq consume rows error!") diff --git a/utils/test/c/sml_test.c b/utils/test/c/sml_test.c index e4ed6037a3..5be9d98a7f 100644 --- a/utils/test/c/sml_test.c +++ b/utils/test/c/sml_test.c @@ -1533,6 +1533,7 @@ int sml_ts3724_Test() { const char *sql[] = { "stb.2,t1=1 f1=283i32 1632299372000", + "stb_2,t1=1 f1=283i32 1632299372000", ".stb2,t1=1 f1=106i32 1632299378000", "stb2.,t1=1 f1=106i32 1632299378000", }; @@ -1547,6 +1548,18 @@ int sml_ts3724_Test() { printf("%s result0:%s\n", __FUNCTION__, taos_errstr(pRes)); taos_free_result(pRes); + pRes = taos_query(taos, "select * from stb_2"); + TAOS_ROW row = taos_fetch_row(pRes); + int numRows = taos_affected_rows(pRes); + ASSERT(numRows == 1); + taos_free_result(pRes); + + pRes = taos_query(taos, "show stables"); + row = taos_fetch_row(pRes); + numRows = taos_affected_rows(pRes); + ASSERT(numRows == 3); + taos_free_result(pRes); + taos_close(taos); return code;