From 56626f2e764a091c0839d420939982cfb74994d4 Mon Sep 17 00:00:00 2001 From: huolibo Date: Thu, 27 Jul 2023 16:52:08 +0800 Subject: [PATCH 01/21] feat(driver): add committed assignment API for jdbc --- .../jni/com_taosdata_jdbc_tmq_TMQConnector.h | 15 +++ source/client/src/clientTmqConnector.c | 108 +++++++++++++++++- 2 files changed, 122 insertions(+), 1 deletion(-) diff --git a/source/client/jni/com_taosdata_jdbc_tmq_TMQConnector.h b/source/client/jni/com_taosdata_jdbc_tmq_TMQConnector.h index 422bcd57ac..ebc4eacdf9 100644 --- a/source/client/jni/com_taosdata_jdbc_tmq_TMQConnector.h +++ b/source/client/jni/com_taosdata_jdbc_tmq_TMQConnector.h @@ -92,6 +92,10 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqSubscriptionIm */ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqCommitSync(JNIEnv *, jobject, jlong, jlong); +JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqCommitAllSync(JNIEnv *, jobject, jlong); + +JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqCommitOffsetSyncImp(JNIEnv *, jobject, jlong, jstring, + jint, jlong); /* * Class: com_taosdata_jdbc_tmq_TMQConnector * Method: tmqCommitAsync @@ -102,6 +106,12 @@ JNIEXPORT void JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqCommitAsync(JN JNIEXPORT void JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_consumerCommitAsync(JNIEnv *, jobject, jlong, jlong, jobject); +JNIEXPORT void JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_consumerCommitAllAsync(JNIEnv *, jobject, jlong, + jobject); + +JNIEXPORT void JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_consumerCommitOffsetAsync(JNIEnv *, jobject, jlong, + jstring, jint, jlong, jobject); + /* * Class: com_taosdata_jdbc_tmq_TMQConnector * Method: tmqUnsubscribeImp @@ -179,6 +189,11 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqSeekImp(JNIEnv JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqGetTopicAssignmentImp(JNIEnv *, jobject, jlong, jstring, jobject); +JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqCommittedImp(JNIEnv *, jobject, jlong, jstring, + jint); + +JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqPositionImp(JNIEnv *, jobject, jlong, jstring, jint); + #ifdef __cplusplus } #endif diff --git a/source/client/src/clientTmqConnector.c b/source/client/src/clientTmqConnector.c index 6ec82aa6ef..487a86a589 100644 --- a/source/client/src/clientTmqConnector.c +++ b/source/client/src/clientTmqConnector.c @@ -291,6 +291,39 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqCommitSync(JNI TAOS_RES *res = (TAOS_RES *)jres; return tmq_commit_sync(tmq, res); } +JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqCommitAllSync(JNIEnv *env, jobject jobj, jlong jtmq) { + tmq_t *tmq = (tmq_t *)jtmq; + if (tmq == NULL) { + jniError("jobj:%p, tmq is closed", jobj); + return TMQ_CONSUMER_NULL; + } + + return tmq_commit_sync(tmq, NULL); +} + +JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqCommitOffsetSyncImp(JNIEnv *env, jobject jobj, + jlong jtmq, jstring jtopic, + jint vgId, jlong offset) { + tmq_t *tmq = (tmq_t *)jtmq; + if (tmq == NULL) { + jniDebug("jobj:%p, tmq is closed", jobj); + return TMQ_CONSUMER_NULL; + } + + if (jtopic == NULL) { + jniDebug("jobj:%p, topic is null", jobj); + return TMQ_TOPIC_NULL; + } + const char *topicName = (*env)->GetStringUTFChars(env, jtopic, NULL); + + int code = tmq_commit_offset_sync(tmq, topicName, vgId, offset); + if (code != TSDB_CODE_SUCCESS) { + jniError("jobj:%p, tmq commit offset error, code:%d, msg:%s", jobj, code, tmq_err2str(code)); + } + + (*env)->ReleaseStringUTFChars(env, jtopic, topicName); + return code; +} // deprecated JNIEXPORT void JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqCommitAsync(JNIEnv *env, jobject jobj, jlong jtmq, @@ -319,6 +352,27 @@ JNIEXPORT void JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_consumerCommitAsy tmq_commit_async(tmq, res, consumer_callback, offset); } +JNIEXPORT void JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_consumerCommitAllAsync(JNIEnv *env, jobject jobj, + jlong jtmq, jobject offset) { + tmqGlobalMethod(env); + tmq_t *tmq = (tmq_t *)jtmq; + + offset = (*env)->NewGlobalRef(env, offset); + tmq_commit_async(tmq, NULL, consumer_callback, offset); +} + +JNIEXPORT void JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_consumerCommitOffsetAsync(JNIEnv *env, jobject jobj, + jlong jtmq, jstring jtopic, + jint vgId, jlong offset, + jobject callback) { + tmqGlobalMethod(env); + tmq_t *tmq = (tmq_t *)jtmq; + const char *topicName = (*env)->GetStringUTFChars(env, jtopic, NULL); + + callback = (*env)->NewGlobalRef(env, callback); + tmq_commit_offset_async(tmq, topicName, vgId, offset, consumer_callback, callback); +} + JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqUnsubscribeImp(JNIEnv *env, jobject jobj, jlong jtmq) { tmq_t *tmq = (tmq_t *)jtmq; @@ -497,9 +551,9 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqGetTopicAssign int32_t res = tmq_get_topic_assignment(tmq, topicName, &pAssign, &numOfAssignment); if (res != TSDB_CODE_SUCCESS) { - (*env)->ReleaseStringUTFChars(env, jtopic, topicName); jniError("jobj:%p, tmq get topic assignment error, topic:%s, code:%d, msg:%s", jobj, topicName, res, tmq_err2str(res)); + (*env)->ReleaseStringUTFChars(env, jtopic, topicName); tmq_free_assignment(pAssign); return (jint)res; } @@ -518,3 +572,55 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqGetTopicAssign tmq_free_assignment(pAssign); return JNI_SUCCESS; } + +JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqCommittedImp(JNIEnv *env, jobject jobj, jlong jtmq, + jstring jtopic, jint vgId) { + tmq_t *tmq = (tmq_t *)jtmq; + if (tmq == NULL) { + jniDebug("jobj:%p, tmq is closed", jobj); + return TMQ_CONSUMER_NULL; + } + + if (jtopic == NULL) { + jniDebug("jobj:%p, topic is null", jobj); + return TMQ_TOPIC_NULL; + } + + const char *topicName = (*env)->GetStringUTFChars(env, jtopic, NULL); + + int64_t offset = tmq_committed(tmq, topicName, vgId); + + if (offset < JNI_SUCCESS) { + jniError("jobj:%p, tmq get committed offset error, topic:%s, vgId:%d, code:0x%" PRIx64 ", msg:%s", jobj, topicName, + vgId, offset, tmq_err2str(offset)); + } + + (*env)->ReleaseStringUTFChars(env, jtopic, topicName); + return (jlong)offset; +} + +JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqPositionImp(JNIEnv *env, jobject jobj, jlong jtmq, + jstring jtopic, jint vgId) { + tmq_t *tmq = (tmq_t *)jtmq; + if (tmq == NULL) { + jniDebug("jobj:%p, tmq is closed", jobj); + return TMQ_CONSUMER_NULL; + } + + if (jtopic == NULL) { + jniDebug("jobj:%p, topic is null", jobj); + return TMQ_TOPIC_NULL; + } + + const char *topicName = (*env)->GetStringUTFChars(env, jtopic, NULL); + + int64_t offset = tmq_position(tmq, topicName, vgId); + + if (offset < JNI_SUCCESS) { + jniError("jobj:%p, tmq get position error, topic:%s, vgId:%d, code:0x%" PRIx64 ", msg:%s", jobj, topicName, vgId, + offset, tmq_err2str(offset)); + } + + (*env)->ReleaseStringUTFChars(env, jtopic, topicName); + return (jlong)offset; +} \ No newline at end of file From 6b61da1a41eaeab0e56e1ec3bec40d4faf89573e Mon Sep 17 00:00:00 2001 From: liuyao <54liuyao@163.com> Date: Wed, 23 Aug 2023 15:34:46 +0800 Subject: [PATCH 02/21] reload semi session state --- source/libs/executor/src/timewindowoperator.c | 79 ++++++++++++++++--- 1 file changed, 70 insertions(+), 9 deletions(-) diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 4f793d7064..37f737c2ce 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -3240,6 +3240,31 @@ static int32_t compactSessionWindow(SOperatorInfo* pOperator, SResultWindowInfo* return winNum; } +static void compactSessionSemiWindow(SOperatorInfo* pOperator, SResultWindowInfo* pCurWin) { + SExprSupp* pSup = &pOperator->exprSupp; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI; + SStreamSessionAggOperatorInfo* pInfo = pOperator->info; + SResultRow* pCurResult = NULL; + int32_t numOfOutput = pOperator->exprSupp.numOfExprs; + SStreamAggSupporter* pAggSup = &pInfo->streamAggSup; + // Just look for the window behind StartIndex + while (1) { + SResultWindowInfo winInfo = {0}; + SStreamStateCur* pCur = getNextSessionWinInfo(pAggSup, NULL, pCurWin, &winInfo); + if (!IS_VALID_SESSION_WIN(winInfo) || !isInWindow(pCurWin, winInfo.sessionWin.win.skey, pAggSup->gap) || + !inWinRange(&pAggSup->winRange, &winInfo.sessionWin.win)) { + taosMemoryFree(winInfo.pOutputBuf); + pAPI->stateStore.streamStateFreeCur(pCur); + break; + } + pCurWin->sessionWin.win.ekey = TMAX(pCurWin->sessionWin.win.ekey, winInfo.sessionWin.win.ekey); + doDeleteSessionWindow(pAggSup, &winInfo.sessionWin); + pAPI->stateStore.streamStateFreeCur(pCur); + taosMemoryFree(winInfo.pOutputBuf); + } +} + int32_t saveSessionOutputBuf(SStreamAggSupporter* pAggSup, SResultWindowInfo* pWinInfo) { saveSessionDiscBuf(pAggSup->pState, &pWinInfo->sessionWin, pWinInfo->pOutputBuf, pAggSup->resultRowSize, &pAggSup->stateStore); pWinInfo->pOutputBuf = NULL; @@ -3417,9 +3442,9 @@ void doBuildDeleteDataBlock(SOperatorInfo* pOp, SSHashObj* pStDeleted, SSDataBlo } static void rebuildSessionWindow(SOperatorInfo* pOperator, SArray* pWinArray, SSHashObj* pStUpdated) { - SExprSupp* pSup = &pOperator->exprSupp; - SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI; + SExprSupp* pSup = &pOperator->exprSupp; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI; int32_t size = taosArrayGetSize(pWinArray); SStreamSessionAggOperatorInfo* pInfo = pOperator->info; @@ -3446,6 +3471,7 @@ static void rebuildSessionWindow(SOperatorInfo* pOperator, SArray* pWinArray, SS int32_t code = getSessionWinBuf(pChAggSup, pCur, &childWin); if (code == TSDB_CODE_SUCCESS && !inWinRange(&pAggSup->winRange, &childWin.sessionWin.win)) { + releaseOutputBuf(pAggSup->pState, NULL, (SResultRow*)childWin.pOutputBuf, &pAggSup->stateStore); continue; } @@ -3454,6 +3480,7 @@ static void rebuildSessionWindow(SOperatorInfo* pOperator, SArray* pWinArray, SS setSessionOutputBuf(pAggSup, pWinKey->win.skey, pWinKey->win.ekey, pWinKey->groupId, &parentWin); code = initSessionOutputBuf(&parentWin, &pResult, pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset); if (code != TSDB_CODE_SUCCESS || pResult == NULL) { + releaseOutputBuf(pAggSup->pState, NULL, (SResultRow*)childWin.pOutputBuf, &pAggSup->stateStore); break; } } @@ -3464,7 +3491,9 @@ static void rebuildSessionWindow(SOperatorInfo* pOperator, SArray* pWinArray, SS compactFunctions(pSup->pCtx, pChild->exprSupp.pCtx, numOfOutput, pTaskInfo, &pInfo->twAggSup.timeWindowData); compactSessionWindow(pOperator, &parentWin, pStUpdated, NULL, true); saveResult(parentWin, pStUpdated); + releaseOutputBuf(pAggSup->pState, NULL, (SResultRow*)childWin.pOutputBuf, &pAggSup->stateStore); } else { + releaseOutputBuf(pAggSup->pState, NULL, (SResultRow*)childWin.pOutputBuf, &pAggSup->stateStore); break; } } @@ -3703,11 +3732,11 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { } void streamSessionReleaseState(SOperatorInfo* pOperator) { - if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION) { - SStreamSessionAggOperatorInfo* pInfo = pOperator->info; - int32_t resSize = taosArrayGetSize(pInfo->historyWins) * sizeof(SSessionKey); - pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_SESSION_OP_STATE_NAME, strlen(STREAM_SESSION_OP_STATE_NAME), pInfo->historyWins->pData, resSize); - } + SStreamSessionAggOperatorInfo* pInfo = pOperator->info; + int32_t resSize = taosArrayGetSize(pInfo->historyWins) * sizeof(SSessionKey); + pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_SESSION_OP_STATE_NAME, + strlen(STREAM_SESSION_OP_STATE_NAME), pInfo->historyWins->pData, + resSize); SOperatorInfo* downstream = pOperator->pDownstream[0]; if (downstream->fpSet.releaseStreamStateFn) { downstream->fpSet.releaseStreamStateFn(downstream); @@ -3719,6 +3748,33 @@ void resetWinRange(STimeWindow* winRange) { winRange->ekey = INT64_MAX; } +void streamSessionSemiReloadState(SOperatorInfo* pOperator) { + SStreamSessionAggOperatorInfo* pInfo = pOperator->info; + SStreamAggSupporter* pAggSup = &pInfo->streamAggSup; + resetWinRange(&pAggSup->winRange); + + SResultWindowInfo winInfo = {0}; + int32_t size = 0; + void* pBuf = NULL; + int32_t code = pAggSup->stateStore.streamStateGetInfo(pAggSup->pState, STREAM_SESSION_OP_STATE_NAME, + strlen(STREAM_SESSION_OP_STATE_NAME), &pBuf, &size); + int32_t num = size / sizeof(SSessionKey); + SSessionKey* pSeKeyBuf = (SSessionKey*) pBuf; + ASSERT(size == num * sizeof(SSessionKey)); + for (int32_t i = 0; i < num; i++) { + SResultWindowInfo winInfo = {0}; + setSessionOutputBuf(pAggSup, pSeKeyBuf[i].win.skey, pSeKeyBuf[i].win.ekey, pSeKeyBuf[i].groupId, &winInfo); + compactSessionSemiWindow(pOperator, &winInfo); + saveSessionOutputBuf(pAggSup, &winInfo); + } + taosMemoryFree(pBuf); + + SOperatorInfo* downstream = pOperator->pDownstream[0]; + if (downstream->fpSet.reloadStreamStateFn) { + downstream->fpSet.reloadStreamStateFn(downstream); + } +} + void streamSessionReloadState(SOperatorInfo* pOperator) { SStreamSessionAggOperatorInfo* pInfo = pOperator->info; SStreamAggSupporter* pAggSup = &pInfo->streamAggSup; @@ -3948,6 +4004,11 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) { removeSessionResults(pInfo->pStDeleted, pInfo->pUpdated); tSimpleHashCleanup(pInfo->pStUpdated); pInfo->pStUpdated = NULL; + + if(pInfo->isHistoryOp) { + getMaxTsWins(pInfo->pUpdated, pInfo->historyWins); + } + initGroupResInfoFromArrayList(&pInfo->groupResInfo, pInfo->pUpdated); pInfo->pUpdated = NULL; blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity); @@ -3996,8 +4057,8 @@ SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream blockDataEnsureCapacity(pInfo->pUpdateRes, 128); pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamSessionSemiAgg, NULL, destroyStreamSessionAggOperatorInfo, optrDefaultBufFn, NULL); + setOperatorStreamStateFn(pOperator, streamSessionReleaseState, streamSessionSemiReloadState); } - setOperatorStreamStateFn(pOperator, streamSessionReleaseState, streamSessionReloadState); setOperatorInfo(pOperator, name, pPhyNode->type, false, OP_NOT_OPENED, pInfo, pTaskInfo); pOperator->operatorType = pPhyNode->type; From 7b9d14aad369f244fe6449e7bf65b4c4a4fa5afc Mon Sep 17 00:00:00 2001 From: Shuduo Sang Date: Thu, 24 Aug 2023 10:34:35 +0800 Subject: [PATCH 03/21] docs: add note to csharp connector docs (#22541) * docs: fix taos_init() return type * docs: refine c interface doc * docs: add platform note to csharp connector. --- docs/en/14-reference/03-connector/09-csharp.mdx | 4 ++++ docs/zh/08-connector/40-csharp.mdx | 4 ++++ 2 files changed, 8 insertions(+) diff --git a/docs/en/14-reference/03-connector/09-csharp.mdx b/docs/en/14-reference/03-connector/09-csharp.mdx index 718462295a..203d44fe02 100644 --- a/docs/en/14-reference/03-connector/09-csharp.mdx +++ b/docs/en/14-reference/03-connector/09-csharp.mdx @@ -30,6 +30,10 @@ The source code of `TDengine.Connector` is hosted on [GitHub](https://github.com The supported platforms are the same as those supported by the TDengine client driver. +:::note +Please note TDengine does not support 32bit Windows any more. +::: + ## Version support Please refer to [version support list](/reference/connector#version-support) diff --git a/docs/zh/08-connector/40-csharp.mdx b/docs/zh/08-connector/40-csharp.mdx index 3a945e77fd..325c71da88 100644 --- a/docs/zh/08-connector/40-csharp.mdx +++ b/docs/zh/08-connector/40-csharp.mdx @@ -29,6 +29,10 @@ import CSAsyncQuery from "../07-develop/04-query-data/_cs_async.mdx" 支持的平台和 TDengine 客户端驱动支持的平台一致。 +:::note +注意 TDengine 不再支持 32 位 Windows 平台。 +::: + ## 版本支持 请参考[版本支持列表](../#版本支持) From ed70cd1f63fa876aa8c2523e348c262dd2883e40 Mon Sep 17 00:00:00 2001 From: huolibo Date: Thu, 24 Aug 2023 10:55:41 +0800 Subject: [PATCH 04/21] fix: jni error log --- source/client/src/clientTmqConnector.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/client/src/clientTmqConnector.c b/source/client/src/clientTmqConnector.c index 487a86a589..2bea738c23 100644 --- a/source/client/src/clientTmqConnector.c +++ b/source/client/src/clientTmqConnector.c @@ -590,7 +590,7 @@ JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqCommittedImp( int64_t offset = tmq_committed(tmq, topicName, vgId); - if (offset < JNI_SUCCESS) { + if (offset < JNI_SUCCESS && offset != -2147467247) { jniError("jobj:%p, tmq get committed offset error, topic:%s, vgId:%d, code:0x%" PRIx64 ", msg:%s", jobj, topicName, vgId, offset, tmq_err2str(offset)); } From 7ebce2814f365e721b9228d33dc974aeb06f5b5a Mon Sep 17 00:00:00 2001 From: liuyao <54liuyao@163.com> Date: Thu, 24 Aug 2023 11:18:10 +0800 Subject: [PATCH 05/21] mem leak --- source/libs/executor/src/filloperator.c | 1 + source/libs/executor/src/timewindowoperator.c | 3 +++ source/libs/stream/src/streamBackendRocksdb.c | 19 +++++++++++-------- 3 files changed, 15 insertions(+), 8 deletions(-) diff --git a/source/libs/executor/src/filloperator.c b/source/libs/executor/src/filloperator.c index be4cb8d2dc..bf7da7505a 100644 --- a/source/libs/executor/src/filloperator.c +++ b/source/libs/executor/src/filloperator.c @@ -851,6 +851,7 @@ void setFillValueInfo(SSDataBlock* pBlock, TSKEY ts, int32_t rowId, SStreamFillS if (hasPrevWindow(pFillSup)) { setFillKeyInfo(prevWKey, ts, &pFillSup->interval, pFillInfo); pFillInfo->pos = FILL_POS_END; + resetFillWindow(&pFillSup->next); pFillSup->next.key = pFillSup->cur.key; pFillSup->next.pRowVal = pFillSup->cur.pRowVal; pFillInfo->preRowKey = INT64_MIN; diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 37f737c2ce..16eaf0649d 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -2905,6 +2905,7 @@ void destroyStreamSessionAggOperatorInfo(void* param) { SStreamSessionAggOperatorInfo* pInfo = (SStreamSessionAggOperatorInfo*)param; cleanupBasicInfo(&pInfo->binfo); destroyStreamAggSupporter(&pInfo->streamAggSup); + cleanupExprSupp(&pInfo->scalarSupp); if (pInfo->pChildren != NULL) { int32_t size = taosArrayGetSize(pInfo->pChildren); @@ -4096,6 +4097,7 @@ void destroyStreamStateOperatorInfo(void* param) { cleanupBasicInfo(&pInfo->binfo); destroyStreamAggSupporter(&pInfo->streamAggSup); cleanupGroupResInfo(&pInfo->groupResInfo); + cleanupExprSupp(&pInfo->scalarSupp); if (pInfo->pChildren != NULL) { int32_t size = taosArrayGetSize(pInfo->pChildren); for (int32_t i = 0; i < size; i++) { @@ -4109,6 +4111,7 @@ void destroyStreamStateOperatorInfo(void* param) { taosArrayDestroy(pInfo->historyWins); tSimpleHashCleanup(pInfo->pSeUpdated); tSimpleHashCleanup(pInfo->pSeDeleted); + pInfo->pUpdated = taosArrayDestroy(pInfo->pUpdated); taosMemoryFreeClear(param); } diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 4a0ce81e68..1981cd76b3 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -1620,19 +1620,22 @@ int32_t streamStateSessionGetKVByCur_rocksdb(SStreamStateCur* pCur, SSessionKey* if (len < 0) { return -1; } + + if (pVLen != NULL) *pVLen = len; + + if (pKTmp->opNum != pCur->number) { + taosMemoryFree(val); + return -1; + } + if (pKey->groupId != 0 && pKey->groupId != pKTmp->key.groupId) { + taosMemoryFree(val); + return -1; + } if (pVal != NULL) { *pVal = (char*)val; } else { taosMemoryFree(val); } - if (pVLen != NULL) *pVLen = len; - - if (pKTmp->opNum != pCur->number) { - return -1; - } - if (pKey->groupId != 0 && pKey->groupId != pKTmp->key.groupId) { - return -1; - } *pKey = pKTmp->key; return 0; } From 1e108c4178bd6cc4051e2715c9d063b90b8da4ca Mon Sep 17 00:00:00 2001 From: liuyao <54liuyao@163.com> Date: Thu, 24 Aug 2023 11:34:43 +0800 Subject: [PATCH 06/21] mem leak --- source/libs/stream/src/streamBackendRocksdb.c | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 1981cd76b3..571aca9935 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -1613,6 +1613,9 @@ int32_t streamStateSessionGetKVByCur_rocksdb(SStreamStateCur* pCur, SSessionKey* const char* curKey = rocksdb_iter_key(pCur->iter, (size_t*)&kLen); stateSessionKeyDecode((void*)&ktmp, (char*)curKey); + if (pVal != NULL) *pVal = NULL; + if (pVLen != NULL) *pVLen = 0; + SStateSessionKey* pKTmp = &ktmp; const char* vval = rocksdb_iter_value(pCur->iter, (size_t*)&vLen); char* val = NULL; @@ -1621,8 +1624,6 @@ int32_t streamStateSessionGetKVByCur_rocksdb(SStreamStateCur* pCur, SSessionKey* return -1; } - if (pVLen != NULL) *pVLen = len; - if (pKTmp->opNum != pCur->number) { taosMemoryFree(val); return -1; @@ -1631,11 +1632,14 @@ int32_t streamStateSessionGetKVByCur_rocksdb(SStreamStateCur* pCur, SSessionKey* taosMemoryFree(val); return -1; } + if (pVal != NULL) { *pVal = (char*)val; } else { taosMemoryFree(val); } + + if (pVLen != NULL) *pVLen = len; *pKey = pKTmp->key; return 0; } From 3c8bd57140463a8faf45db4c83fd0b3840c4ad42 Mon Sep 17 00:00:00 2001 From: wangjiaming0909 <604227650@qq.com> Date: Wed, 23 Aug 2023 14:54:41 +0800 Subject: [PATCH 07/21] fix: fill operator with desc ts order returned 1 more rows --- source/libs/executor/src/filloperator.c | 1 + tests/system-test/2-query/fill_with_group.py | 9 +++++++++ 2 files changed, 10 insertions(+) diff --git a/source/libs/executor/src/filloperator.c b/source/libs/executor/src/filloperator.c index be4cb8d2dc..e8bdd37616 100644 --- a/source/libs/executor/src/filloperator.c +++ b/source/libs/executor/src/filloperator.c @@ -177,6 +177,7 @@ static void revisedFillStartKey(SFillOperatorInfo* pInfo, SSDataBlock* pBlock, i } // todo time window chosen problem: t or prev value? + if (t > pInfo->pFillInfo->start) t -= pInterval->sliding; taosFillUpdateStartTimestampInfo(pInfo->pFillInfo, t); } } diff --git a/tests/system-test/2-query/fill_with_group.py b/tests/system-test/2-query/fill_with_group.py index c1ea9877a2..393102c8ed 100644 --- a/tests/system-test/2-query/fill_with_group.py +++ b/tests/system-test/2-query/fill_with_group.py @@ -130,9 +130,18 @@ class TDTestCase: for j in range(0,60): tdSql.checkData(i*1500+j, 1, None) + def test_fill_with_order_by(self): + sql = "select _wstart, _wend, count(ts), sum(c1) from meters where ts > '2018-11-25 00:00:00.000' and ts < '2018-11-26 00:00:00.00' interval(1d) fill(NULL) order by _wstart" + tdSql.query(sql) + tdSql.checkRows(1) + sql = "select _wstart, _wend, count(ts), sum(c1) from meters where ts > '2018-11-25 00:00:00.000' and ts < '2018-11-26 00:00:00.00' interval(1d) fill(NULL) order by _wstart desc" + tdSql.query(sql) + tdSql.checkRows(1) + def run(self): self.prepareTestEnv() self.test_partition_by_with_interval_fill_prev_new_group_fill_error() + self.test_fill_with_order_by() def stop(self): tdSql.close() From b142979f7fd190c5851d47bd03e0a8c714c6e261 Mon Sep 17 00:00:00 2001 From: liuyao <54liuyao@163.com> Date: Thu, 24 Aug 2023 15:00:09 +0800 Subject: [PATCH 08/21] mem leak --- source/common/src/tdatablock.c | 3 ++- source/libs/executor/src/filloperator.c | 5 ++--- source/libs/executor/src/groupoperator.c | 3 ++- source/libs/executor/src/projectoperator.c | 4 ++++ source/libs/executor/src/scanoperator.c | 3 ++- 5 files changed, 12 insertions(+), 6 deletions(-) diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 5188b1e27c..f6fce452ed 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -1340,8 +1340,9 @@ SSDataBlock* blockCopyOneRow(const SSDataBlock* pDataBlock, int32_t rowIdx) { for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, i); SColumnInfoData* pSrc = taosArrayGet(pDataBlock->pDataBlock, i); - void* pData = colDataGetData(pSrc, rowIdx); bool isNull = colDataIsNull(pSrc, pDataBlock->info.rows, rowIdx, NULL); + void* pData = NULL; + if (!isNull) pData = colDataGetData(pSrc, rowIdx); colDataSetVal(pDst, 0, pData, isNull); } diff --git a/source/libs/executor/src/filloperator.c b/source/libs/executor/src/filloperator.c index bf7da7505a..7dbdb547d3 100644 --- a/source/libs/executor/src/filloperator.c +++ b/source/libs/executor/src/filloperator.c @@ -838,6 +838,7 @@ void setFillValueInfo(SSDataBlock* pBlock, TSKEY ts, int32_t rowId, SStreamFillS (pFillSup->next.key == pFillInfo->nextRowKey && !hasPrevWindow(pFillSup)))) { setFillKeyInfo(ts, nextWKey, &pFillSup->interval, pFillInfo); pFillInfo->pos = FILL_POS_START; + resetFillWindow(&pFillSup->prev); pFillSup->prev.key = pFillSup->cur.key; pFillSup->prev.pRowVal = pFillSup->cur.pRowVal; } else if (hasPrevWindow(pFillSup)) { @@ -1230,8 +1231,6 @@ static void doDeleteFillResult(SOperatorInfo* pOperator) { SWinKey nextKey = {.groupId = groupId, .ts = ts}; while (pInfo->srcDelRowIndex < pBlock->info.rows) { - void* nextVal = NULL; - int32_t nextLen = 0; TSKEY delTs = tsStarts[pInfo->srcDelRowIndex]; uint64_t delGroupId = groupIds[pInfo->srcDelRowIndex]; int32_t code = TSDB_CODE_SUCCESS; @@ -1246,7 +1245,7 @@ static void doDeleteFillResult(SOperatorInfo* pOperator) { if (delTs == nextKey.ts) { code = pAPI->stateStore.streamStateCurNext(pOperator->pTaskInfo->streamInfo.pState, pCur); if (code == TSDB_CODE_SUCCESS) { - code = pAPI->stateStore.streamStateGetGroupKVByCur(pCur, &nextKey, (const void**)&nextVal, &nextLen); + code = pAPI->stateStore.streamStateGetGroupKVByCur(pCur, &nextKey, NULL, NULL); } // ts will be deleted later if (delTs != ts) { diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 9228c923a6..7d0fafff73 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -972,7 +972,8 @@ static SSDataBlock* buildStreamPartitionResult(SOperatorInfo* pOperator) { SColumnInfoData* pSrcCol = taosArrayGet(pSrc->pDataBlock, slotId); SColumnInfoData* pDestCol = taosArrayGet(pDest->pDataBlock, j); bool isNull = colDataIsNull(pSrcCol, pSrc->info.rows, rowIndex, NULL); - char* pSrcData = colDataGetData(pSrcCol, rowIndex); + char* pSrcData = NULL; + if (!isNull) pSrcData = colDataGetData(pSrcCol, rowIndex); colDataSetVal(pDestCol, pDest->info.rows, pSrcData, isNull); } pDest->info.rows++; diff --git a/source/libs/executor/src/projectoperator.c b/source/libs/executor/src/projectoperator.c index 1cc377b3ee..7266fb461d 100644 --- a/source/libs/executor/src/projectoperator.c +++ b/source/libs/executor/src/projectoperator.c @@ -372,6 +372,10 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0; } + if (pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM) { + printDataBlock(p, "project"); + } + return (p->info.rows > 0) ? p : NULL; } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index da4bd1e23c..af1740750c 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1247,7 +1247,8 @@ static SSDataBlock* doRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pSDB, int32 SColumnInfoData* pSrcCol = taosArrayGet(tmpBlock->pDataBlock, j); SColumnInfoData* pDestCol = taosArrayGet(pResult->pDataBlock, j); bool isNull = colDataIsNull(pSrcCol, tmpBlock->info.rows, i, NULL); - char* pSrcData = colDataGetData(pSrcCol, i); + char* pSrcData = NULL; + if (!isNull) pSrcData = colDataGetData(pSrcCol, i); colDataSetVal(pDestCol, pResult->info.rows, pSrcData, isNull); } pResult->info.rows++; From 4c6bc4d2c3f33686f138d75101371191cf405f5e Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 24 Aug 2023 16:10:03 +0800 Subject: [PATCH 09/21] fix(stream): continue check wal when meeting empty delete block msg. --- source/dnode/vnode/src/tq/tqRead.c | 107 +++++++++++++++++------------ 1 file changed, 62 insertions(+), 45 deletions(-) diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 43f38ade97..d3157dc3b0 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -296,53 +296,70 @@ int32_t tqReaderSeek(STqReader* pReader, int64_t ver, const char* id) { } int32_t extractMsgFromWal(SWalReader* pReader, void** pItem, int64_t maxVer, const char* id) { - int32_t code = walNextValidMsg(pReader); - if (code != TSDB_CODE_SUCCESS) { + int32_t code = 0; + SWalCont* pCont = &pReader->pHead->head; + + while(1) { + code = walNextValidMsg(pReader); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + + int64_t ver = pCont->version; + if (ver > maxVer) { + tqDebug("maxVer in WAL:%" PRId64 " reached current:%" PRId64 ", do not scan wal anymore, %s", maxVer, ver, id); + return TSDB_CODE_SUCCESS; + } + + if (pCont->msgType == TDMT_VND_SUBMIT) { + void* pBody = POINTER_SHIFT(pCont->body, sizeof(SSubmitReq2Msg)); + int32_t len = pCont->bodyLen - sizeof(SSubmitReq2Msg); + + void* data = taosMemoryMalloc(len); + if (data == NULL) { + // todo: for all stream in this vnode, keep this offset in the offset files, and wait for a moment, and then retry + code = TSDB_CODE_OUT_OF_MEMORY; + terrno = code; + + tqError("vgId:%d, failed to copy submit data for stream processing, since out of memory", 0); + return code; + } + + memcpy(data, pBody, len); + SPackedData data1 = (SPackedData){.ver = ver, .msgLen = len, .msgStr = data}; + + *pItem = (SStreamQueueItem*)streamDataSubmitNew(&data1, STREAM_INPUT__DATA_SUBMIT); + if (*pItem == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + terrno = code; + tqError("%s failed to create data submit for stream since out of memory", id); + return code; + } + } else if (pCont->msgType == TDMT_VND_DELETE) { + void* pBody = POINTER_SHIFT(pCont->body, sizeof(SMsgHead)); + int32_t len = pCont->bodyLen - sizeof(SMsgHead); + + code = extractDelDataBlock(pBody, len, ver, (SStreamRefDataBlock**)pItem); + if (code == TSDB_CODE_SUCCESS) { + if (*pItem == NULL) { + tqDebug("s-task:%s empty delete msg, discard it, len:%d, ver:%" PRId64, id, len, ver); + // we need to continue check next data in the wal files. + continue; + } else { + tqDebug("s-task:%s delete msg extract from WAL, len:%d, ver:%" PRId64, id, len, ver); + } + } else { + terrno = code; + tqError("s-task:%s extract delete msg from WAL failed, code:%s", id, tstrerror(code)); + return code; + } + + } else { + ASSERT(0); + } + return code; } - - int64_t ver = pReader->pHead->head.version; - if (ver > maxVer) { - tqDebug("maxVer in WAL:%"PRId64" reached current:%"PRId64", do not scan wal anymore, %s", maxVer, ver, id); - return TSDB_CODE_SUCCESS; - } - - if (pReader->pHead->head.msgType == TDMT_VND_SUBMIT) { - void* pBody = POINTER_SHIFT(pReader->pHead->head.body, sizeof(SSubmitReq2Msg)); - int32_t len = pReader->pHead->head.bodyLen - sizeof(SSubmitReq2Msg); - - void* data = taosMemoryMalloc(len); - if (data == NULL) { - // todo: for all stream in this vnode, keep this offset in the offset files, and wait for a moment, and then retry - terrno = TSDB_CODE_OUT_OF_MEMORY; - tqError("vgId:%d, failed to copy submit data for stream processing, since out of memory", 0); - return -1; - } - - memcpy(data, pBody, len); - SPackedData data1 = (SPackedData){.ver = ver, .msgLen = len, .msgStr = data}; - - *pItem = (SStreamQueueItem*)streamDataSubmitNew(&data1, STREAM_INPUT__DATA_SUBMIT); - if (*pItem == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - tqError("%s failed to create data submit for stream since out of memory", id); - return terrno; - } - } else if (pReader->pHead->head.msgType == TDMT_VND_DELETE) { - void* pBody = POINTER_SHIFT(pReader->pHead->head.body, sizeof(SMsgHead)); - int32_t len = pReader->pHead->head.bodyLen - sizeof(SMsgHead); - - code = extractDelDataBlock(pBody, len, ver, (SStreamRefDataBlock**)pItem); - if (code != TSDB_CODE_SUCCESS) { - tqError("s-task:%s extract delete msg from WAL failed, code:%s", id, tstrerror(code)); - } else { - tqDebug("s-task:%s delete msg extract from WAL, len:%d, ver:%"PRId64, id, len, ver); - } - } else { - ASSERT(0); - } - - return 0; } // todo ignore the error in wal? From 5344efe181f22160a3993051757f9a90aa3c9e8a Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 24 Aug 2023 16:22:24 +0800 Subject: [PATCH 10/21] fix(stream): adjust the ptr. --- source/dnode/vnode/src/tq/tqRead.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index d3157dc3b0..04e3e8c0df 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -297,7 +297,6 @@ int32_t tqReaderSeek(STqReader* pReader, int64_t ver, const char* id) { int32_t extractMsgFromWal(SWalReader* pReader, void** pItem, int64_t maxVer, const char* id) { int32_t code = 0; - SWalCont* pCont = &pReader->pHead->head; while(1) { code = walNextValidMsg(pReader); @@ -305,7 +304,8 @@ int32_t extractMsgFromWal(SWalReader* pReader, void** pItem, int64_t maxVer, con return code; } - int64_t ver = pCont->version; + SWalCont* pCont = &pReader->pHead->head; + int64_t ver = pCont->version; if (ver > maxVer) { tqDebug("maxVer in WAL:%" PRId64 " reached current:%" PRId64 ", do not scan wal anymore, %s", maxVer, ver, id); return TSDB_CODE_SUCCESS; From a72e6fd2196e2f369efb1c9208e2a4d426e333e8 Mon Sep 17 00:00:00 2001 From: liuyao <54liuyao@163.com> Date: Thu, 24 Aug 2023 16:32:34 +0800 Subject: [PATCH 11/21] mem leak --- source/dnode/mnode/impl/src/mndStream.c | 1 + 1 file changed, 1 insertion(+) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index a0d53ec780..716d00bcaa 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -1564,6 +1564,7 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) { } if (pStream->status != STREAM_STATUS__PAUSE) { + sdbRelease(pMnode->pSdb, pStream); return 0; } From 7da464d8b7f6545017297878b5cd35d492d1f3dd Mon Sep 17 00:00:00 2001 From: liuyao <54liuyao@163.com> Date: Thu, 24 Aug 2023 16:36:58 +0800 Subject: [PATCH 12/21] mem leak --- source/libs/executor/src/timewindowoperator.c | 1 + 1 file changed, 1 insertion(+) diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 16eaf0649d..ca47cee95c 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -2922,6 +2922,7 @@ void destroyStreamSessionAggOperatorInfo(void* param) { blockDataDestroy(pInfo->pUpdateRes); tSimpleHashCleanup(pInfo->pStUpdated); tSimpleHashCleanup(pInfo->pStDeleted); + pInfo->pUpdated = taosArrayDestroy(pInfo->pUpdated); taosArrayDestroy(pInfo->historyWins); taosMemoryFreeClear(param); From b83cc11043334f79eac15cf220d406dfdb51205a Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Thu, 24 Aug 2023 16:38:53 +0800 Subject: [PATCH 13/21] fix:[TD-25651] reset epoch if consumer changed to avoid consumeing no data --- source/dnode/vnode/src/tq/tq.c | 14 +------------- 1 file changed, 1 insertion(+), 13 deletions(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 815e9647b5..c8da7e0b46 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -879,20 +879,8 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg } else { tqInfo("vgId:%d switch consumer from Id:0x%" PRIx64 " to Id:0x%" PRIx64, req.vgId, pHandle->consumerId, req.newConsumerId); atomic_store_64(&pHandle->consumerId, req.newConsumerId); - // atomic_add_fetch_32(&pHandle->epoch, 1); + atomic_store_32(&pHandle->epoch, 0); - // kill executing task - // if(tqIsHandleExec(pHandle)) { - // qTaskInfo_t pTaskInfo = pHandle->execHandle.task; - // if (pTaskInfo != NULL) { - // qKillTask(pTaskInfo, TSDB_CODE_SUCCESS); - // } - - // if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { - // qStreamCloseTsdbReader(pTaskInfo); - // } - // } - // remove if it has been register in the push manager, and return one empty block to consumer tqUnregisterPushHandle(pTq, pHandle); ret = tqMetaSaveHandle(pTq, req.subKey, pHandle); } From 3c6870275bdd3162e3d3b204c693805d029997e6 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 24 Aug 2023 16:45:47 +0800 Subject: [PATCH 14/21] fix(parser): update the key words for disk io throttling. --- docs/en/12-taos-sql/20-keywords.md | 2 +- docs/zh/12-taos-sql/20-keywords.md | 2 +- include/common/ttokendef.h | 2 +- source/libs/parser/inc/sql.y | 2 +- source/libs/parser/src/parTokenizer.c | 2 +- source/libs/parser/src/sql.c | 12 ++++++------ source/libs/parser/test/parShowToUse.cpp | 2 +- 7 files changed, 12 insertions(+), 12 deletions(-) diff --git a/docs/en/12-taos-sql/20-keywords.md b/docs/en/12-taos-sql/20-keywords.md index d563181b87..983d4f63c9 100644 --- a/docs/en/12-taos-sql/20-keywords.md +++ b/docs/en/12-taos-sql/20-keywords.md @@ -178,7 +178,7 @@ The following list shows all reserved keywords: - MATCH - MAX_DELAY -- MAX_SPEED +- BWLIMIT - MAXROWS - MERGE - META diff --git a/docs/zh/12-taos-sql/20-keywords.md b/docs/zh/12-taos-sql/20-keywords.md index f52af2f282..e7e926d0b7 100644 --- a/docs/zh/12-taos-sql/20-keywords.md +++ b/docs/zh/12-taos-sql/20-keywords.md @@ -178,7 +178,7 @@ description: TDengine 保留关键字的详细列表 - MATCH - MAX_DELAY -- MAX_SPEED +- BWLIMIT - MAXROWS - MERGE - META diff --git a/include/common/ttokendef.h b/include/common/ttokendef.h index 8a6b7b5020..f74ced9190 100644 --- a/include/common/ttokendef.h +++ b/include/common/ttokendef.h @@ -113,7 +113,7 @@ #define TK_TABLE_PREFIX 95 #define TK_TABLE_SUFFIX 96 #define TK_NK_COLON 97 -#define TK_MAX_SPEED 98 +#define TK_BWLIMIT 98 #define TK_START 99 #define TK_TIMESTAMP 100 #define TK_END 101 diff --git a/source/libs/parser/inc/sql.y b/source/libs/parser/inc/sql.y index 6c3f589159..43e75e5c5a 100755 --- a/source/libs/parser/inc/sql.y +++ b/source/libs/parser/inc/sql.y @@ -286,7 +286,7 @@ retention(A) ::= NK_VARIABLE(B) NK_COLON NK_VARIABLE(C). %type speed_opt { int32_t } %destructor speed_opt { } speed_opt(A) ::= . { A = 0; } -speed_opt(A) ::= MAX_SPEED NK_INTEGER(B). { A = taosStr2Int32(B.z, NULL, 10); } +speed_opt(A) ::= BWLIMIT NK_INTEGER(B). { A = taosStr2Int32(B.z, NULL, 10); } start_opt(A) ::= . { A = NULL; } start_opt(A) ::= START WITH NK_INTEGER(B). { A = createValueNode(pCxt, TSDB_DATA_TYPE_BIGINT, &B); } diff --git a/source/libs/parser/src/parTokenizer.c b/source/libs/parser/src/parTokenizer.c index ca7ac1a0b6..df01fe5fc8 100644 --- a/source/libs/parser/src/parTokenizer.c +++ b/source/libs/parser/src/parTokenizer.c @@ -137,7 +137,7 @@ static SKeyword keywordTable[] = { {"MATCH", TK_MATCH}, {"MAXROWS", TK_MAXROWS}, {"MAX_DELAY", TK_MAX_DELAY}, - {"MAX_SPEED", TK_MAX_SPEED}, + {"BWLIMIT", TK_BWLIMIT}, {"MERGE", TK_MERGE}, {"META", TK_META}, {"ONLY", TK_ONLY}, diff --git a/source/libs/parser/src/sql.c b/source/libs/parser/src/sql.c index a912fb4e71..755102395a 100644 --- a/source/libs/parser/src/sql.c +++ b/source/libs/parser/src/sql.c @@ -1150,7 +1150,7 @@ static const YYCODETYPE yyFallback[] = { 0, /* TABLE_PREFIX => nothing */ 0, /* TABLE_SUFFIX => nothing */ 0, /* NK_COLON => nothing */ - 0, /* MAX_SPEED => nothing */ + 0, /* BWLIMIT => nothing */ 0, /* START => nothing */ 0, /* TIMESTAMP => nothing */ 287, /* END => ABORT */ @@ -1575,7 +1575,7 @@ static const char *const yyTokenName[] = { /* 95 */ "TABLE_PREFIX", /* 96 */ "TABLE_SUFFIX", /* 97 */ "NK_COLON", - /* 98 */ "MAX_SPEED", + /* 98 */ "BWLIMIT", /* 99 */ "START", /* 100 */ "TIMESTAMP", /* 101 */ "END", @@ -2114,7 +2114,7 @@ static const char *const yyRuleName[] = { /* 140 */ "retention_list ::= retention_list NK_COMMA retention", /* 141 */ "retention ::= NK_VARIABLE NK_COLON NK_VARIABLE", /* 142 */ "speed_opt ::=", - /* 143 */ "speed_opt ::= MAX_SPEED NK_INTEGER", + /* 143 */ "speed_opt ::= BWLIMIT NK_INTEGER", /* 144 */ "start_opt ::=", /* 145 */ "start_opt ::= START WITH NK_INTEGER", /* 146 */ "start_opt ::= START WITH NK_STRING", @@ -3335,7 +3335,7 @@ static const YYCODETYPE yyRuleInfoLhs[] = { 366, /* (140) retention_list ::= retention_list NK_COMMA retention */ 369, /* (141) retention ::= NK_VARIABLE NK_COLON NK_VARIABLE */ 361, /* (142) speed_opt ::= */ - 361, /* (143) speed_opt ::= MAX_SPEED NK_INTEGER */ + 361, /* (143) speed_opt ::= BWLIMIT NK_INTEGER */ 362, /* (144) start_opt ::= */ 362, /* (145) start_opt ::= START WITH NK_INTEGER */ 362, /* (146) start_opt ::= START WITH NK_STRING */ @@ -3940,7 +3940,7 @@ static const signed char yyRuleInfoNRhs[] = { -3, /* (140) retention_list ::= retention_list NK_COMMA retention */ -3, /* (141) retention ::= NK_VARIABLE NK_COLON NK_VARIABLE */ 0, /* (142) speed_opt ::= */ - -2, /* (143) speed_opt ::= MAX_SPEED NK_INTEGER */ + -2, /* (143) speed_opt ::= BWLIMIT NK_INTEGER */ 0, /* (144) start_opt ::= */ -3, /* (145) start_opt ::= START WITH NK_INTEGER */ -3, /* (146) start_opt ::= START WITH NK_STRING */ @@ -5016,7 +5016,7 @@ static YYACTIONTYPE yy_reduce( case 330: /* bufsize_opt ::= */ yytestcase(yyruleno==330); { yymsp[1].minor.yy416 = 0; } break; - case 143: /* speed_opt ::= MAX_SPEED NK_INTEGER */ + case 143: /* speed_opt ::= BWLIMIT NK_INTEGER */ case 331: /* bufsize_opt ::= BUFSIZE NK_INTEGER */ yytestcase(yyruleno==331); { yymsp[-1].minor.yy416 = taosStr2Int32(yymsp[0].minor.yy0.z, NULL, 10); } break; diff --git a/source/libs/parser/test/parShowToUse.cpp b/source/libs/parser/test/parShowToUse.cpp index b7bd0e802c..4396f786ff 100644 --- a/source/libs/parser/test/parShowToUse.cpp +++ b/source/libs/parser/test/parShowToUse.cpp @@ -286,7 +286,7 @@ TEST_F(ParserShowToUseTest, trimDatabase) { run("TRIM DATABASE wxy_db"); setTrimDbReq("wxy_db", 100); - run("TRIM DATABASE wxy_db MAX_SPEED 100"); + run("TRIM DATABASE wxy_db BWLIMIT 100"); } TEST_F(ParserShowToUseTest, useDatabase) { From 64959f14e9440049bf4e8e2e39ef95d0230fd8c1 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Thu, 24 Aug 2023 18:18:02 +0800 Subject: [PATCH 15/21] fix:dot process in schemaless --- source/client/src/clientSml.c | 14 +++++++++++++- utils/test/c/sml_test.c | 13 +++++++++++++ 2 files changed, 26 insertions(+), 1 deletion(-) diff --git a/source/client/src/clientSml.c b/source/client/src/clientSml.c index ffff3df5d0..8e5c6e7250 100644 --- a/source/client/src/clientSml.c +++ b/source/client/src/clientSml.c @@ -218,7 +218,16 @@ int32_t smlSetCTableName(SSmlTableInfo *oneTable) { if (strlen(oneTable->childTableName) == 0) { SArray *dst = taosArrayDup(oneTable->tags, NULL); - RandTableName rName = {dst, oneTable->sTableName, (uint8_t)oneTable->sTableNameLen, oneTable->childTableName}; + ASSERT(oneTable->sTableNameLen < TSDB_TABLE_NAME_LEN); + char superName[TSDB_TABLE_NAME_LEN] = {0}; + RandTableName rName = {dst, NULL, (uint8_t)oneTable->sTableNameLen, oneTable->childTableName}; + if(tsSmlDot2Underline){ + memcpy(superName, oneTable->sTableName, oneTable->sTableNameLen); + smlStrReplace(superName, oneTable->sTableNameLen); + rName.stbFullName = superName; + }else{ + rName.stbFullName = oneTable->sTableName; + } buildChildTableName(&rName); taosArrayDestroy(dst); @@ -230,6 +239,9 @@ void getTableUid(SSmlHandle *info, SSmlLineInfo *currElement, SSmlTableInfo *tin char key[TSDB_TABLE_NAME_LEN * 2 + 1] = {0}; size_t nLen = strlen(tinfo->childTableName); memcpy(key, currElement->measure, currElement->measureLen); + if(tsSmlDot2Underline){ + smlStrReplace(key, currElement->measureLen); + } memcpy(key + currElement->measureLen + 1, tinfo->childTableName, nLen); void *uid = taosHashGet(info->tableUids, key, diff --git a/utils/test/c/sml_test.c b/utils/test/c/sml_test.c index e4ed6037a3..5be9d98a7f 100644 --- a/utils/test/c/sml_test.c +++ b/utils/test/c/sml_test.c @@ -1533,6 +1533,7 @@ int sml_ts3724_Test() { const char *sql[] = { "stb.2,t1=1 f1=283i32 1632299372000", + "stb_2,t1=1 f1=283i32 1632299372000", ".stb2,t1=1 f1=106i32 1632299378000", "stb2.,t1=1 f1=106i32 1632299378000", }; @@ -1547,6 +1548,18 @@ int sml_ts3724_Test() { printf("%s result0:%s\n", __FUNCTION__, taos_errstr(pRes)); taos_free_result(pRes); + pRes = taos_query(taos, "select * from stb_2"); + TAOS_ROW row = taos_fetch_row(pRes); + int numRows = taos_affected_rows(pRes); + ASSERT(numRows == 1); + taos_free_result(pRes); + + pRes = taos_query(taos, "show stables"); + row = taos_fetch_row(pRes); + numRows = taos_affected_rows(pRes); + ASSERT(numRows == 3); + taos_free_result(pRes); + taos_close(taos); return code; From d5cc4155420a2e93b012c56827791ea2cf7810dd Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Thu, 24 Aug 2023 18:35:53 +0800 Subject: [PATCH 16/21] fix:test case error --- tests/system-test/7-tmq/subscribeDb0.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/system-test/7-tmq/subscribeDb0.py b/tests/system-test/7-tmq/subscribeDb0.py index ed13fcbe06..d4dfe425dc 100644 --- a/tests/system-test/7-tmq/subscribeDb0.py +++ b/tests/system-test/7-tmq/subscribeDb0.py @@ -237,7 +237,7 @@ class TDTestCase: for i in range(expectRows): totalConsumeRows += resultList[i] - if totalConsumeRows != expectrowcnt: + if totalConsumeRows < expectrowcnt: tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt)) tdLog.exit("tmq consume rows error!") From 8045f30be8b8d27f82a854f503314cb2431c48fb Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Thu, 24 Aug 2023 23:15:35 +0800 Subject: [PATCH 17/21] fix:sml test case error for tbname --- tests/system-test/2-query/sml_TS-3724.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/system-test/2-query/sml_TS-3724.py b/tests/system-test/2-query/sml_TS-3724.py index a8b16c4662..410e266f10 100644 --- a/tests/system-test/2-query/sml_TS-3724.py +++ b/tests/system-test/2-query/sml_TS-3724.py @@ -67,7 +67,7 @@ class TDTestCase: tdSql.query(f"select distinct tbname from {dbname}.`sys_if_bytes_out`") tdSql.checkRows(2) - tdSql.query(f"select * from {dbname}.t_fc70dec6677d4277c5d9799c4da806da order by times") + tdSql.query(f"select * from {dbname}.t_f67972b49aa8adf8bca5d0d54f0d850d order by times") tdSql.checkRows(2) tdSql.checkData(0, 1, 1.300000000) tdSql.checkData(1, 1, 13.000000000) From f7b42ad41b9a301009749a67c95a20e26a47bce6 Mon Sep 17 00:00:00 2001 From: Shuduo Sang Date: Fri, 25 Aug 2023 09:22:29 +0800 Subject: [PATCH 18/21] fix: specify utf-8 in jdbc example pom.xml (#22565) * fix: use latest version of jdbc connector * fix: remove locale and timezone to avoid confusing user * fix: update readme.md * fix: refine demo.c * fix: specify utf-8 in jdbc example pom.xml --- examples/JDBC/taosdemo/pom.xml | 1 + examples/JDBC/taosdemo/readme.md | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/examples/JDBC/taosdemo/pom.xml b/examples/JDBC/taosdemo/pom.xml index 0d47663bba..ff64d3e1df 100644 --- a/examples/JDBC/taosdemo/pom.xml +++ b/examples/JDBC/taosdemo/pom.xml @@ -133,6 +133,7 @@ 8 8 + UTF-8 diff --git a/examples/JDBC/taosdemo/readme.md b/examples/JDBC/taosdemo/readme.md index edac970399..986eef8a05 100644 --- a/examples/JDBC/taosdemo/readme.md +++ b/examples/JDBC/taosdemo/readme.md @@ -8,4 +8,4 @@ java -jar target/taosdemo-2.0.1-jar-with-dependencies.jar -host -data ``` 如果发生错误 Exception in thread "main" java.lang.UnsatisfiedLinkError: no taos in java.library.path -请检查是否安装 TDengine 客户端安装包或编译 TDengine 安装。如果确定已经安装过还出现这个错误,可以在命令行 java 后加 -Djava.library.path=/usr/local/lib 来指定寻找共享库的路径。 +请检查是否安装 TDengine 客户端安装包或编译 TDengine 安装。如果确定已经安装过还出现这个错误,可以在命令行 java 后加 -Djava.library.path=/usr/lib 来指定寻找共享库的路径。 From 250a8a7c39f664678a88973906eab40be33d3076 Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Fri, 25 Aug 2023 10:06:59 +0800 Subject: [PATCH 19/21] fix: taosCompressFile mem leak --- source/os/src/osFile.c | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/source/os/src/osFile.c b/source/os/src/osFile.c index dd670595f0..ede1f1fb0e 100644 --- a/source/os/src/osFile.c +++ b/source/os/src/osFile.c @@ -885,13 +885,16 @@ int32_t taosCompressFile(char *srcFileName, char *destFileName) { char *data = taosMemoryMalloc(compressSize); gzFile dstFp = NULL; - TdFilePtr pSrcFile = taosOpenFile(srcFileName, TD_FILE_READ | TD_FILE_STREAM); + TdFilePtr pFile = NULL; + TdFilePtr pSrcFile = NULL; + + pSrcFile = taosOpenFile(srcFileName, TD_FILE_READ | TD_FILE_STREAM); if (pSrcFile == NULL) { ret = -1; goto cmp_end; } - TdFilePtr pFile = taosOpenFile(destFileName, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC); + pFile = taosOpenFile(destFileName, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC); if (pFile == NULL) { ret = -2; goto cmp_end; @@ -910,6 +913,9 @@ int32_t taosCompressFile(char *srcFileName, char *destFileName) { } cmp_end: + if (pFile) { + taosCloseFile(&pFile); + } if (pSrcFile) { taosCloseFile(&pSrcFile); } From 29aaf1c86857fb11ef21423da0dc2d81040c4389 Mon Sep 17 00:00:00 2001 From: Alex Duan <51781608+DuanKuanJun@users.noreply.github.com> Date: Fri, 25 Aug 2023 10:51:17 +0800 Subject: [PATCH 20/21] Update 06-taosdump.md dot replace with '-Q' --- docs/zh/14-reference/06-taosdump.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/docs/zh/14-reference/06-taosdump.md b/docs/zh/14-reference/06-taosdump.md index 12122edd32..9fe3c5af7a 100644 --- a/docs/zh/14-reference/06-taosdump.md +++ b/docs/zh/14-reference/06-taosdump.md @@ -105,6 +105,8 @@ Usage: taosdump [OPTION...] dbname [tbname ...] -L, --loose-mode Using loose mode if the table name and column name use letter and number only. Default is NOT. -n, --no-escape No escape char '`'. Default is using it. + -Q, --dot-replace Repalce dot character with underline character in + the table name. -T, --thread-num=THREAD_NUM Number of thread for dump in file. Default is 8. -C, --cloud=CLOUD_DSN specify a DSN to access TDengine cloud service From acc12036a561aafebae5f02bde87b9629ac2bfca Mon Sep 17 00:00:00 2001 From: Alex Duan <51781608+DuanKuanJun@users.noreply.github.com> Date: Fri, 25 Aug 2023 10:53:14 +0800 Subject: [PATCH 21/21] Update 06-taosdump.md dot replace with '-Q' (English) --- docs/en/14-reference/06-taosdump.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/docs/en/14-reference/06-taosdump.md b/docs/en/14-reference/06-taosdump.md index 6d5547e7a9..baf07d6b9e 100644 --- a/docs/en/14-reference/06-taosdump.md +++ b/docs/en/14-reference/06-taosdump.md @@ -102,6 +102,8 @@ Usage: taosdump [OPTION...] dbname [tbname ...] -L, --loose-mode Use loose mode if the table name and column name use letter and number only. Default is NOT. -n, --no-escape No escape char '`'. Default is using it. + -Q, --dot-replace Repalce dot character with underline character in + the table name. -T, --thread-num=THREAD_NUM Number of thread for dump in file. Default is 8. -C, --cloud=CLOUD_DSN specify a DSN to access TDengine cloud service