From 98cbb92ad9cb11f6e850a7d998d944ff24135ea8 Mon Sep 17 00:00:00 2001 From: xsren <285808407@qq.com> Date: Thu, 14 Sep 2023 14:43:07 +0800 Subject: [PATCH 01/10] merge stable sort intead of qsort as insert unordered data --- include/util/talgo.h | 10 +++ include/util/tarray.h | 9 ++- source/common/src/tdataformat.c | 2 +- source/util/src/talgo.c | 79 ++++++++++++++++++++++++ source/util/src/tarray.c | 4 ++ source/util/test/CMakeLists.txt | 8 +++ source/util/test/talgoTest.cpp | 104 ++++++++++++++++++++++++++++++++ 7 files changed, 214 insertions(+), 2 deletions(-) create mode 100644 source/util/test/talgoTest.cpp diff --git a/include/util/talgo.h b/include/util/talgo.h index 7c92c0fe87..675bb66431 100644 --- a/include/util/talgo.h +++ b/include/util/talgo.h @@ -54,6 +54,16 @@ typedef int32_t (*__ext_compar_fn_t)(const void *p1, const void *p2, const void */ void taosqsort(void *src, int64_t numOfElem, int64_t size, const void *param, __ext_compar_fn_t comparFn); +/** + * merge sort, with the compare function requiring additional parameters support + * + * @param src + * @param numOfElem + * @param size + * @param comparFn + */ +void taosMergeSort(void *src, int64_t numOfElem, int64_t size, __compar_fn_t comparFn); + /** * binary search, with range support * diff --git a/include/util/tarray.h b/include/util/tarray.h index 4d9c930521..17fc69cde5 100644 --- a/include/util/tarray.h +++ b/include/util/tarray.h @@ -214,12 +214,19 @@ void taosArrayDestroyEx(SArray* pArray, FDelete fp); void taosArraySwap(SArray* a, SArray* b); /** - * sort the array + * sort the array use qsort * @param pArray * @param compar */ void taosArraySort(SArray* pArray, __compar_fn_t comparFn); +/** + * sort the array use merge sort + * @param pArray + * @param compar + */ +void taosArrayMSort(SArray* pArray, __compar_fn_t comparFn); + /** * search the array * @param pArray diff --git a/source/common/src/tdataformat.c b/source/common/src/tdataformat.c index 62504139f0..1f754cd4b8 100644 --- a/source/common/src/tdataformat.c +++ b/source/common/src/tdataformat.c @@ -612,7 +612,7 @@ _exit: void tRowSort(SArray *aRowP) { if (TARRAY_SIZE(aRowP) <= 1) return; - taosArraySort(aRowP, tRowPCmprFn); + taosArrayMSort(aRowP, tRowPCmprFn); } int32_t tRowMerge(SArray *aRowP, STSchema *pTSchema, int8_t flag) { diff --git a/source/util/src/talgo.c b/source/util/src/talgo.c index e373850b3c..a39dd4cc99 100644 --- a/source/util/src/talgo.c +++ b/source/util/src/talgo.c @@ -273,3 +273,82 @@ void taosheapsort(void *base, int32_t size, int32_t len, const void *parcompar, taosMemoryFree(buf); } + +static void taosMerge(void *src, int32_t start, int32_t leftend, int32_t end, int64_t size, const void *param, + __ext_compar_fn_t comparFn, void *tmp) { + int32_t leftSize = leftend - start + 1; + int32_t rightSize = end - leftend; + + void *leftBuf = tmp; + void *rightBuf = (char *)tmp + (leftSize * size); + + memcpy(leftBuf, elePtrAt(src, size, start), leftSize * size); + memcpy(rightBuf, elePtrAt(src, size, leftend + 1), rightSize * size); + + int32_t i = 0, j = 0, k = start; + + while (i < leftSize && j < rightSize) { + int32_t ret = comparFn(elePtrAt(leftBuf, size, i), elePtrAt(rightBuf, size, j), param); + if (ret <= 0) { + memcpy(elePtrAt(src, size, k), elePtrAt(leftBuf, size, i), size); + i++; + } else { + memcpy(elePtrAt(src, size, k), elePtrAt(rightBuf, size, j), size); + j++; + } + k++; + } + + while (i < leftSize) { + memcpy(elePtrAt(src, size, k), elePtrAt(leftBuf, size, i), size); + i++; + k++; + } + + while (j < rightSize) { + memcpy(elePtrAt(src, size, k), elePtrAt(rightBuf, size, j), size); + j++; + k++; + } +} + +static void taosMergeSortHelper(void *src, int64_t numOfElem, int64_t size, const void *param, __ext_compar_fn_t comparFn) { + // short array sort, instead of merge sort process + const int32_t THRESHOLD_SIZE = 6; + char *buf = taosMemoryCalloc(1, size); // prepare the swap buffer + for (int32_t start = 0; start < numOfElem - 1; start += THRESHOLD_SIZE) { + int32_t end = (start + THRESHOLD_SIZE - 1) <= numOfElem - 1 ? (start + THRESHOLD_SIZE - 1) : numOfElem - 1; + tInsertSort(src, size, start, end, param, comparFn, buf); + } + taosMemoryFreeClear(buf); + + if (numOfElem > THRESHOLD_SIZE) { + int32_t currSize; + void *tmp = taosMemoryMalloc(numOfElem * size); + + for (currSize = THRESHOLD_SIZE; currSize <= numOfElem - 1; currSize = 2 * currSize) { + int32_t leftStart; + for (leftStart = 0; leftStart < numOfElem - 1; leftStart += 2 * currSize) { + int32_t leftend = leftStart + currSize - 1; + int32_t rightEnd = + (leftStart + 2 * currSize - 1 < numOfElem - 1) ? (leftStart + 2 * currSize - 1) : (numOfElem - 1); + if (leftend >= rightEnd) break; + + taosMerge(src, leftStart, leftend, rightEnd, size, param, comparFn, tmp); + } + } + + taosMemoryFreeClear(tmp); + } +} + +int32_t msortHelper(const void *p1, const void *p2, const void *param) { + __compar_fn_t comparFn = param; + return comparFn(p1, p2); +} + + +void taosMergeSort(void *src, int64_t numOfElem, int64_t size, __compar_fn_t comparFn) { + void *param = comparFn; + taosMergeSortHelper(src, numOfElem, size, param, msortHelper); +} diff --git a/source/util/src/tarray.c b/source/util/src/tarray.c index 8e7c0f9584..0a71061c52 100644 --- a/source/util/src/tarray.c +++ b/source/util/src/tarray.c @@ -417,6 +417,10 @@ void taosArraySort(SArray* pArray, __compar_fn_t compar) { taosSort(pArray->pData, pArray->size, pArray->elemSize, compar); } +void taosArrayMSort(SArray* pArray, __compar_fn_t compar) { + taosMergeSort(pArray->pData, pArray->size, pArray->elemSize, compar); +} + void* taosArraySearch(const SArray* pArray, const void* key, __compar_fn_t comparFn, int32_t flags) { return taosbsearch(key, pArray->pData, pArray->size, pArray->elemSize, comparFn, flags); } diff --git a/source/util/test/CMakeLists.txt b/source/util/test/CMakeLists.txt index 0bf06e6f44..94f8deee44 100644 --- a/source/util/test/CMakeLists.txt +++ b/source/util/test/CMakeLists.txt @@ -84,3 +84,11 @@ add_test( NAME pageBufferTest COMMAND pageBufferTest ) + +# talgoTest +add_executable(talgoTest "talgoTest.cpp") +target_link_libraries(talgoTest os util gtest_main) +add_test( + NAME talgoTest + COMMAND talgoTest +) diff --git a/source/util/test/talgoTest.cpp b/source/util/test/talgoTest.cpp new file mode 100644 index 0000000000..b5a8db7378 --- /dev/null +++ b/source/util/test/talgoTest.cpp @@ -0,0 +1,104 @@ +#include +#include +#include "talgo.h" + +struct TestStruct { + int a; + float b; +}; + +// Define a custom comparison function for testing +int cmpFunc(const void* a, const void* b) { + const TestStruct* pa = reinterpret_cast(a); + const TestStruct* pb = reinterpret_cast(b); + if (pa->a < pb->a) { + return -1; + } else if (pa->a > pb->a) { + return 1; + } else { + return 0; + } +} + +TEST(utilTest, taosMSort) { + // Create an array of test data + TestStruct arr[] = {{4, 2.5}, {3, 6}, {2, 1.5}, {3, 2}, {1, 3.5}, {3, 5}}; + + // Sort the array using taosSort + taosMergeSort(arr, 6, sizeof(TestStruct), cmpFunc); + + for (int i = 0; i < sizeof(arr) / sizeof(TestStruct); i++) { + printf("%d: %d %f\n", i, arr[i].a, arr[i].b); + } + + // Check that the array is sorted correctly + EXPECT_EQ(arr[0].a, 1); + EXPECT_EQ(arr[1].a, 2); + EXPECT_EQ(arr[2].a, 3); + EXPECT_EQ(arr[2].b, 6); + EXPECT_EQ(arr[3].a, 3); + EXPECT_EQ(arr[3].b, 2); + EXPECT_EQ(arr[4].a, 3); + EXPECT_EQ(arr[4].b, 5); + EXPECT_EQ(arr[5].a, 4); +} + +int cmpInt(const void* a, const void* b) { + int int_a = *((int*)a); + int int_b = *((int*)b); + + if (int_a == int_b) + return 0; + else if (int_a < int_b) + return -1; + else + return 1; +} + +TEST(utilTest, taosMSort2) { + clock_t start_time, end_time; + double cpu_time_used; + + int times = 10000; + start_time = clock(); + for (int i = 0; i < 10000; i++) { + TestStruct arr[] = {{4, 2.5}, {3, 6}, {2, 1.5}, {3, 2}, {1, 3.5}, {3, 5}}; + taosMergeSort(arr, 6, sizeof(TestStruct), cmpFunc); + } + end_time = clock(); + cpu_time_used = ((double)(end_time - start_time)) / CLOCKS_PER_SEC; + printf("taosMSort %d times: %f s\n", times, cpu_time_used); + + start_time = clock(); + for (int i = 0; i < 10000; i++) { + TestStruct arr[] = {{4, 2.5}, {3, 6}, {2, 1.5}, {3, 2}, {1, 3.5}, {3, 5}}; + taosSort(arr, 6, sizeof(TestStruct), cmpFunc); + } + end_time = clock(); + cpu_time_used = ((double)(end_time - start_time)) / CLOCKS_PER_SEC; + printf("taosSort %d times: %f s\n", times, cpu_time_used); + + const int arraySize = 1000000; + int data1[arraySize]; + int data2[arraySize]; + for (int i = 0; i < arraySize; ++i) { + data1[i] = taosRand(); + data2[i] = data1[i]; + } + start_time = clock(); + taosMergeSort(data1, arraySize, sizeof(int), cmpInt); + end_time = clock(); + cpu_time_used = ((double)(end_time - start_time)) / CLOCKS_PER_SEC; + printf("taosMSort length:%d cost: %f s\n", arraySize, cpu_time_used); + + start_time = clock(); + taosSort(data2, arraySize, sizeof(int), cmpInt); + end_time = clock(); + cpu_time_used = ((double)(end_time - start_time)) / CLOCKS_PER_SEC; + printf("taosSort length:%d cost: %f s\n", arraySize, cpu_time_used); + + for (int i = 0; i < arraySize - 1; i++) { + EXPECT_EQ(data1[i], data2[i]); + ASSERT_LE(data1[i], data1[i+1]); + } +} From 8ebb6e202ed1cdf56df54cef1402643cdde798ea Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Fri, 15 Sep 2023 19:26:55 +0800 Subject: [PATCH 02/10] fix:[TD-26323]add macro to control tmq assert & make subscribe transaction no conflicts --- source/dnode/mnode/impl/src/mndConsumer.c | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index f25bd2cffb..7838b967b0 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -401,7 +401,9 @@ static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) { SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, pConsumer->cgroup, data->topicName); if(pSub == NULL){ +#ifdef TMQ_DEBUG ASSERT(0); +#endif continue; } taosWLockLatch(&pSub->lock); @@ -499,7 +501,9 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) { SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, pConsumer->cgroup, topic); // txn guarantees pSub is created if(pSub == NULL) { +#ifdef TMQ_DEBUG ASSERT(0); +#endif continue; } taosRLockLatch(&pSub->lock); @@ -510,7 +514,9 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) { // 2.1 fetch topic schema SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic); if(pTopic == NULL) { +#ifdef TMQ_DEBUG ASSERT(0); +#endif taosRUnLockLatch(&pSub->lock); mndReleaseSubscribe(pMnode, pSub); continue; @@ -649,7 +655,7 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { } // check topic existence - pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_TOPIC, pMsg, "subscribe"); + pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pMsg, "subscribe"); if (pTrans == NULL) { goto _over; } From 83d8e3a5244d33d98ca612c6353d447fc0300fff Mon Sep 17 00:00:00 2001 From: xsren <285808407@qq.com> Date: Mon, 18 Sep 2023 13:46:29 +0800 Subject: [PATCH 03/10] mem exception handle --- include/common/tdataformat.h | 2 +- include/util/talgo.h | 3 ++- include/util/tarray.h | 2 +- source/common/src/tdataformat.c | 10 +++++++--- source/libs/executor/src/dataInserter.c | 4 ++-- source/libs/parser/src/parInsertUtil.c | 4 ++-- source/util/src/talgo.c | 10 +++++++--- source/util/src/tarray.c | 4 ++-- 8 files changed, 24 insertions(+), 15 deletions(-) diff --git a/include/common/tdataformat.h b/include/common/tdataformat.h index e04bdd1b07..aed1d03fc1 100644 --- a/include/common/tdataformat.h +++ b/include/common/tdataformat.h @@ -108,7 +108,7 @@ int32_t tBufferReserve(SBuffer *pBuffer, int64_t nData, void **ppData); int32_t tRowBuild(SArray *aColVal, const STSchema *pTSchema, SRow **ppRow); int32_t tRowGet(SRow *pRow, STSchema *pTSchema, int32_t iCol, SColVal *pColVal); void tRowDestroy(SRow *pRow); -void tRowSort(SArray *aRowP); +int32_t tRowSort(SArray *aRowP); int32_t tRowMerge(SArray *aRowP, STSchema *pTSchema, int8_t flag); int32_t tRowUpsertColData(SRow *pRow, STSchema *pTSchema, SColData *aColData, int32_t nColData, int32_t flag); diff --git a/include/util/talgo.h b/include/util/talgo.h index 675bb66431..b065ea3705 100644 --- a/include/util/talgo.h +++ b/include/util/talgo.h @@ -61,8 +61,9 @@ void taosqsort(void *src, int64_t numOfElem, int64_t size, const void *param, __ * @param numOfElem * @param size * @param comparFn + * @return int32_t 0 for success, other for failure. */ -void taosMergeSort(void *src, int64_t numOfElem, int64_t size, __compar_fn_t comparFn); +int32_t taosMergeSort(void *src, int64_t numOfElem, int64_t size, __compar_fn_t comparFn); /** * binary search, with range support diff --git a/include/util/tarray.h b/include/util/tarray.h index 17fc69cde5..e494f78f48 100644 --- a/include/util/tarray.h +++ b/include/util/tarray.h @@ -225,7 +225,7 @@ void taosArraySort(SArray* pArray, __compar_fn_t comparFn); * @param pArray * @param compar */ -void taosArrayMSort(SArray* pArray, __compar_fn_t comparFn); +int32_t taosArrayMSort(SArray* pArray, __compar_fn_t comparFn); /** * search the array diff --git a/source/common/src/tdataformat.c b/source/common/src/tdataformat.c index 1f754cd4b8..b4ca8d1b17 100644 --- a/source/common/src/tdataformat.c +++ b/source/common/src/tdataformat.c @@ -610,9 +610,13 @@ _exit: return code; } -void tRowSort(SArray *aRowP) { - if (TARRAY_SIZE(aRowP) <= 1) return; - taosArrayMSort(aRowP, tRowPCmprFn); +int32_t tRowSort(SArray *aRowP) { + if (TARRAY_SIZE(aRowP) <= 1) return 0; + int32_t code = taosArrayMSort(aRowP, tRowPCmprFn); + if (code != TSDB_CODE_SUCCESS) { + uError("taosArrayMSort failed caused by %d", code); + } + return code; } int32_t tRowMerge(SArray *aRowP, STSchema *pTSchema, int8_t flag) { diff --git a/source/libs/executor/src/dataInserter.c b/source/libs/executor/src/dataInserter.c index e47cbb7eba..f301ddf4be 100644 --- a/source/libs/executor/src/dataInserter.c +++ b/source/libs/executor/src/dataInserter.c @@ -289,8 +289,8 @@ int32_t buildSubmitReqFromBlock(SDataInserterHandle* pInserter, SSubmitReq2** pp } if (disorderTs) { - tRowSort(tbData.aRowP); - if ((terrno = tRowMerge(tbData.aRowP, (STSchema*)pTSchema, 0)) != 0) { + if ((tRowSort(tbData.aRowP) != TSDB_CODE_SUCCESS) || + (terrno = tRowMerge(tbData.aRowP, (STSchema*)pTSchema, 0)) != 0) { goto _end; } } diff --git a/source/libs/parser/src/parInsertUtil.c b/source/libs/parser/src/parInsertUtil.c index 79e305989b..3efb5dafcb 100644 --- a/source/libs/parser/src/parInsertUtil.c +++ b/source/libs/parser/src/parInsertUtil.c @@ -495,9 +495,9 @@ int32_t insMergeTableDataCxt(SHashObj* pTableHash, SArray** pVgDataBlocks) { tColDataSortMerge(pTableCxt->pData->aCol); } else { if (!pTableCxt->ordered) { - tRowSort(pTableCxt->pData->aRowP); + code = tRowSort(pTableCxt->pData->aRowP); } - if (!pTableCxt->ordered || pTableCxt->duplicateTs) { + if (code == TSDB_CODE_SUCCESS && (!pTableCxt->ordered || pTableCxt->duplicateTs)) { code = tRowMerge(pTableCxt->pData->aRowP, pTableCxt->pSchema, 0); } } diff --git a/source/util/src/talgo.c b/source/util/src/talgo.c index a39dd4cc99..8d83a70c11 100644 --- a/source/util/src/talgo.c +++ b/source/util/src/talgo.c @@ -312,10 +312,12 @@ static void taosMerge(void *src, int32_t start, int32_t leftend, int32_t end, in } } -static void taosMergeSortHelper(void *src, int64_t numOfElem, int64_t size, const void *param, __ext_compar_fn_t comparFn) { +static int32_t taosMergeSortHelper(void *src, int64_t numOfElem, int64_t size, const void *param, + __ext_compar_fn_t comparFn) { // short array sort, instead of merge sort process const int32_t THRESHOLD_SIZE = 6; char *buf = taosMemoryCalloc(1, size); // prepare the swap buffer + if (buf == NULL) return TSDB_CODE_OUT_OF_MEMORY; for (int32_t start = 0; start < numOfElem - 1; start += THRESHOLD_SIZE) { int32_t end = (start + THRESHOLD_SIZE - 1) <= numOfElem - 1 ? (start + THRESHOLD_SIZE - 1) : numOfElem - 1; tInsertSort(src, size, start, end, param, comparFn, buf); @@ -325,6 +327,7 @@ static void taosMergeSortHelper(void *src, int64_t numOfElem, int64_t size, cons if (numOfElem > THRESHOLD_SIZE) { int32_t currSize; void *tmp = taosMemoryMalloc(numOfElem * size); + if (tmp == NULL) return TSDB_CODE_OUT_OF_MEMORY; for (currSize = THRESHOLD_SIZE; currSize <= numOfElem - 1; currSize = 2 * currSize) { int32_t leftStart; @@ -340,6 +343,7 @@ static void taosMergeSortHelper(void *src, int64_t numOfElem, int64_t size, cons taosMemoryFreeClear(tmp); } + return 0; } int32_t msortHelper(const void *p1, const void *p2, const void *param) { @@ -348,7 +352,7 @@ int32_t msortHelper(const void *p1, const void *p2, const void *param) { } -void taosMergeSort(void *src, int64_t numOfElem, int64_t size, __compar_fn_t comparFn) { +int32_t taosMergeSort(void *src, int64_t numOfElem, int64_t size, __compar_fn_t comparFn) { void *param = comparFn; - taosMergeSortHelper(src, numOfElem, size, param, msortHelper); + return taosMergeSortHelper(src, numOfElem, size, param, msortHelper); } diff --git a/source/util/src/tarray.c b/source/util/src/tarray.c index 0a71061c52..a7c28df22b 100644 --- a/source/util/src/tarray.c +++ b/source/util/src/tarray.c @@ -417,8 +417,8 @@ void taosArraySort(SArray* pArray, __compar_fn_t compar) { taosSort(pArray->pData, pArray->size, pArray->elemSize, compar); } -void taosArrayMSort(SArray* pArray, __compar_fn_t compar) { - taosMergeSort(pArray->pData, pArray->size, pArray->elemSize, compar); +int32_t taosArrayMSort(SArray* pArray, __compar_fn_t compar) { + return taosMergeSort(pArray->pData, pArray->size, pArray->elemSize, compar); } void* taosArraySearch(const SArray* pArray, const void* key, __compar_fn_t comparFn, int32_t flags) { From 2c10163634812e64359aaa8f663c47ba26d56991 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Mon, 18 Sep 2023 14:19:58 +0800 Subject: [PATCH 04/10] avoid first tag index conflict --- source/dnode/mnode/impl/src/mndStb.c | 161 ++++++++++++----------- source/os/src/osRand.c | 4 +- tests/script/tsim/tagindex/add_index.sim | 10 +- 3 files changed, 94 insertions(+), 81 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index fadaa717a2..0e381a5ac1 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -15,6 +15,7 @@ #define _DEFAULT_SOURCE #include "mndStb.h" +#include "audit.h" #include "mndDb.h" #include "mndDnode.h" #include "mndIndex.h" @@ -31,7 +32,6 @@ #include "mndUser.h" #include "mndVgroup.h" #include "tname.h" -#include "audit.h" #define STB_VER_NUMBER 1 #define STB_RESERVE_SIZE 64 @@ -858,6 +858,23 @@ int32_t mndBuildStbFromReq(SMnode *pMnode, SStbObj *pDst, SMCreateStbReq *pCreat } return 0; } +static int32_t mndGenIdxNameForFirstTag(char *fullname, char *dbname, char *tagname) { + char randStr[TSDB_COL_NAME_LEN] = {0}; + int32_t left = TSDB_COL_NAME_LEN - strlen(tagname) - 1; + if (left <= 1) { + sprintf(fullname, "%s.%s", dbname, tagname); + } else { + int8_t start = left < 8 ? 0 : 8; + int8_t end = left >= 24 ? 24 : left - 1; + // gen rand str len [base:end] + // note: ignore rand performance issues + int64_t len = taosRand() % (end - start + 1) + start; + taosRandStr2(randStr, len); + sprintf(fullname, "%s.%s_%s", dbname, tagname, randStr); + } + + return 0; +} static int32_t mndCreateStb(SMnode *pMnode, SRpcMsg *pReq, SMCreateStbReq *pCreate, SDbObj *pDb) { SStbObj stbObj = {0}; @@ -871,10 +888,8 @@ static int32_t mndCreateStb(SMnode *pMnode, SRpcMsg *pReq, SMCreateStbReq *pCrea mInfo("trans:%d, used to create stb:%s", pTrans->id, pCreate->name); if (mndBuildStbFromReq(pMnode, &stbObj, pCreate, pDb) != 0) goto _OVER; - char randStr[24] = {0}; - taosRandStr2(randStr, tListLen(randStr) - 1); SSchema *pSchema = &(stbObj.pTags[0]); - sprintf(fullIdxName, "%s.%s_%s", pDb->name, pSchema->name, randStr); + mndGenIdxNameForFirstTag(fullIdxName, pDb->name, pSchema->name); SSIdx idx = {0}; if (mndAcquireGlobalIdx(pMnode, fullIdxName, SDB_IDX, &idx) == 0 && idx.pIdx != NULL) { @@ -1066,78 +1081,75 @@ static int32_t mndBuildStbFromAlter(SStbObj *pStb, SStbObj *pDst, SMCreateStbReq return TSDB_CODE_SUCCESS; } -static char* mndAuditFieldTypeStr(int32_t type){ - switch (type) - { - case TSDB_DATA_TYPE_NULL: - return "null"; - case TSDB_DATA_TYPE_BOOL: - return "bool"; - case TSDB_DATA_TYPE_TINYINT: - return "tinyint"; - case TSDB_DATA_TYPE_SMALLINT: - return "smallint"; - case TSDB_DATA_TYPE_INT: - return "int"; - case TSDB_DATA_TYPE_BIGINT: - return "bigint"; - case TSDB_DATA_TYPE_FLOAT: - return "float"; - case TSDB_DATA_TYPE_DOUBLE: - return "double"; - case TSDB_DATA_TYPE_VARCHAR: - return "varchar"; - case TSDB_DATA_TYPE_TIMESTAMP: - return "timestamp"; - case TSDB_DATA_TYPE_NCHAR: - return "nchar"; - case TSDB_DATA_TYPE_UTINYINT: - return "utinyint"; - case TSDB_DATA_TYPE_USMALLINT: - return "usmallint"; - case TSDB_DATA_TYPE_UINT: - return "uint"; - case TSDB_DATA_TYPE_UBIGINT: - return "ubigint"; - case TSDB_DATA_TYPE_JSON: - return "json"; - case TSDB_DATA_TYPE_VARBINARY: - return "varbinary"; - case TSDB_DATA_TYPE_DECIMAL: - return "decimal"; - case TSDB_DATA_TYPE_BLOB: - return "blob"; - case TSDB_DATA_TYPE_MEDIUMBLOB: - return "mediumblob"; - case TSDB_DATA_TYPE_GEOMETRY: - return "geometry"; +static char *mndAuditFieldTypeStr(int32_t type) { + switch (type) { + case TSDB_DATA_TYPE_NULL: + return "null"; + case TSDB_DATA_TYPE_BOOL: + return "bool"; + case TSDB_DATA_TYPE_TINYINT: + return "tinyint"; + case TSDB_DATA_TYPE_SMALLINT: + return "smallint"; + case TSDB_DATA_TYPE_INT: + return "int"; + case TSDB_DATA_TYPE_BIGINT: + return "bigint"; + case TSDB_DATA_TYPE_FLOAT: + return "float"; + case TSDB_DATA_TYPE_DOUBLE: + return "double"; + case TSDB_DATA_TYPE_VARCHAR: + return "varchar"; + case TSDB_DATA_TYPE_TIMESTAMP: + return "timestamp"; + case TSDB_DATA_TYPE_NCHAR: + return "nchar"; + case TSDB_DATA_TYPE_UTINYINT: + return "utinyint"; + case TSDB_DATA_TYPE_USMALLINT: + return "usmallint"; + case TSDB_DATA_TYPE_UINT: + return "uint"; + case TSDB_DATA_TYPE_UBIGINT: + return "ubigint"; + case TSDB_DATA_TYPE_JSON: + return "json"; + case TSDB_DATA_TYPE_VARBINARY: + return "varbinary"; + case TSDB_DATA_TYPE_DECIMAL: + return "decimal"; + case TSDB_DATA_TYPE_BLOB: + return "blob"; + case TSDB_DATA_TYPE_MEDIUMBLOB: + return "mediumblob"; + case TSDB_DATA_TYPE_GEOMETRY: + return "geometry"; - default: - return "error"; + default: + return "error"; } } -static void mndAuditFieldStr(char* detail, SArray *arr, int32_t len, int32_t max){ +static void mndAuditFieldStr(char *detail, SArray *arr, int32_t len, int32_t max) { int32_t detialLen = strlen(detail); int32_t fieldLen = 0; for (int32_t i = 0; i < len; ++i) { SField *pField = taosArrayGet(arr, i); - char field[TSDB_COL_NAME_LEN + 20] = {0}; + char field[TSDB_COL_NAME_LEN + 20] = {0}; fieldLen = strlen(", "); - if(detialLen > 0 && detialLen < max-fieldLen-1) { + if (detialLen > 0 && detialLen < max - fieldLen - 1) { strcat(detail, ", "); detialLen += fieldLen; - } - else{ + } else { break; } sprintf(field, "%s:%s", pField->name, mndAuditFieldTypeStr(pField->type)); fieldLen = strlen(field); - if(detialLen < max-fieldLen-1) { + if (detialLen < max - fieldLen - 1) { strcat(detail, field); detialLen += fieldLen; - } - else{ + } else { break; } } @@ -1252,14 +1264,17 @@ static int32_t mndProcessCreateStbReq(SRpcMsg *pReq) { if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS; char detail[AUDIT_DETAIL_MAX] = {0}; - sprintf(detail, "colVer:%d, delay1:%" PRId64 ", delay2:%" PRId64 ", deleteMark1:%" PRId64 ", " - "deleteMark2:%" PRId64 ", igExists:%d, numOfColumns:%d, numOfFuncs:%d, numOfTags:%d, " - "source:%d, suid:%" PRId64 ", tagVer:%d, ttl:%d, " + sprintf(detail, + "colVer:%d, delay1:%" PRId64 ", delay2:%" PRId64 ", deleteMark1:%" PRId64 + ", " + "deleteMark2:%" PRId64 + ", igExists:%d, numOfColumns:%d, numOfFuncs:%d, numOfTags:%d, " + "source:%d, suid:%" PRId64 + ", tagVer:%d, ttl:%d, " "watermark1:%" PRId64 ", watermark2:%" PRId64, - createReq.colVer, createReq.delay1, createReq.delay2, createReq.deleteMark1, - createReq.deleteMark2, createReq.igExists, createReq.numOfColumns, createReq.numOfFuncs, createReq.numOfTags, - createReq.source, createReq.suid, createReq.tagVer, createReq.ttl, - createReq.watermark1, createReq.watermark2); + createReq.colVer, createReq.delay1, createReq.delay2, createReq.deleteMark1, createReq.deleteMark2, + createReq.igExists, createReq.numOfColumns, createReq.numOfFuncs, createReq.numOfTags, createReq.source, + createReq.suid, createReq.tagVer, createReq.ttl, createReq.watermark1, createReq.watermark2); mndAuditFieldStr(detail, createReq.pColumns, createReq.numOfColumns, AUDIT_DETAIL_MAX); mndAuditFieldStr(detail, createReq.pTags, createReq.numOfTags, AUDIT_DETAIL_MAX); @@ -2338,8 +2353,7 @@ static int32_t mndProcessAlterStbReq(SRpcMsg *pReq) { if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS; char detail[2000] = {0}; - sprintf(detail, "alterType:%d, numOfFields:%d, ttl:%d" , - alterReq.alterType, alterReq.numOfFields, alterReq.ttl); + sprintf(detail, "alterType:%d, numOfFields:%d, ttl:%d", alterReq.alterType, alterReq.numOfFields, alterReq.ttl); SName name = {0}; tNameFromString(&name, alterReq.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); @@ -2436,7 +2450,7 @@ static int32_t mndDropStb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SStbObj *p if (mndSetDropStbRedoActions(pMnode, pTrans, pDb, pStb) != 0) goto _OVER; if (mndDropIdxsByStb(pMnode, pTrans, pDb, pStb) != 0) goto _OVER; if (mndDropSmasByStb(pMnode, pTrans, pDb, pStb) != 0) goto _OVER; - if (mndUserRemoveStb(pMnode, pTrans, pStb->name) != 0) goto _OVER; + if (mndUserRemoveStb(pMnode, pTrans, pStb->name) != 0) goto _OVER; if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER; code = 0; @@ -2609,8 +2623,7 @@ static int32_t mndProcessDropStbReq(SRpcMsg *pReq) { if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS; char detail[2000] = {0}; - sprintf(detail, "igNotExists:%d, source:%d" , - dropReq.igNotExists, dropReq.source); + sprintf(detail, "igNotExists:%d, source:%d", dropReq.igNotExists, dropReq.source); SName name = {0}; tNameFromString(&name, dropReq.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); @@ -3370,7 +3383,7 @@ static int32_t buildSysDbColsInfo(SSDataBlock *p, int8_t buildWhichDBs, char *tb return p->info.rows; } -static int8_t determineBuildColForWhichDBs(const char* db) { +static int8_t determineBuildColForWhichDBs(const char *db) { int8_t buildWhichDBs; if (!db[0]) buildWhichDBs = BUILD_COL_FOR_ALL_DB; @@ -3388,11 +3401,11 @@ static int8_t determineBuildColForWhichDBs(const char* db) { } static int32_t mndRetrieveStbCol(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) { - uint8_t buildWhichDBs; + uint8_t buildWhichDBs; SMnode *pMnode = pReq->info.node; SSdb *pSdb = pMnode->pSdb; SStbObj *pStb = NULL; - int32_t numOfRows = 0; + int32_t numOfRows = 0; buildWhichDBs = determineBuildColForWhichDBs(pShow->db); diff --git a/source/os/src/osRand.c b/source/os/src/osRand.c index 9cb6f6e52a..f73f0c3913 100644 --- a/source/os/src/osRand.c +++ b/source/os/src/osRand.c @@ -86,8 +86,8 @@ void taosRandStr(char* str, int32_t size) { } void taosRandStr2(char* str, int32_t size) { - const char* set = "abcdefghijklmnopqrstuvwxyz0123456789"; - int32_t len = 36; + const char* set = "abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ@"; + int32_t len = strlen(set); for (int32_t i = 0; i < size; ++i) { str[i] = set[taosRand() % len]; diff --git a/tests/script/tsim/tagindex/add_index.sim b/tests/script/tsim/tagindex/add_index.sim index 70771fc63b..b307a79c31 100644 --- a/tests/script/tsim/tagindex/add_index.sim +++ b/tests/script/tsim/tagindex/add_index.sim @@ -292,11 +292,11 @@ if $rows != 1 then return -1 endi -sql drop index $data[0][0] +#sql drop index $data[0][0] -if $rows != 0 then - return -1 -endi +#if $rows != 0 then +#return -1 +#endi sql_error drop index t2 @@ -304,7 +304,7 @@ sql_error drop index t3 -sql create index ti0 on $mtPrefix (t1) +#sql create index ti0 on $mtPrefix (t1) $i = $interval while $i < $limit From c3498fbfe8be8e0fe30820cb1f5f4b4682978573 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Mon, 18 Sep 2023 14:35:07 +0800 Subject: [PATCH 05/10] fix:core in race condition for pTq->pExecStore & return if poll too long & fix test cases if submit empty --- source/dnode/vnode/src/tq/tq.c | 60 +++++++++++++------ source/dnode/vnode/src/tq/tqRead.c | 12 ++-- source/dnode/vnode/src/tq/tqSnapshot.c | 2 + source/dnode/vnode/src/tq/tqUtil.c | 5 +- tests/system-test/7-tmq/subscribeDb3.py | 2 +- .../tmqConsFromTsdb1-1ctb-funcNFilter.py | 2 +- .../7-tmq/tmqConsFromTsdb1-1ctb.py | 2 +- .../tmqConsFromTsdb1-mutilVg-mutilCtb.py | 2 +- .../7-tmq/tmqConsFromTsdb1-mutilVg.py | 2 +- tests/system-test/7-tmq/tmqConsFromTsdb1.py | 2 +- 10 files changed, 59 insertions(+), 32 deletions(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 0bf9cba2dd..bd37d36352 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -269,19 +269,21 @@ int32_t tqProcessSeekReq(STQ* pTq, SRpcMsg* pMsg) { } tqDebug("tmq seek: consumer:0x%" PRIx64 " vgId:%d, subkey %s", req.consumerId, vgId, req.subKey); + taosWLockLatch(&pTq->lock); + STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey)); if (pHandle == NULL) { tqWarn("tmq seek: consumer:0x%" PRIx64 " vgId:%d subkey %s not found", req.consumerId, vgId, req.subKey); code = 0; + taosWUnLockLatch(&pTq->lock); goto end; } // 2. check consumer-vg assignment status - taosRLockLatch(&pTq->lock); if (pHandle->consumerId != req.consumerId) { tqError("ERROR tmq seek: consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64, req.consumerId, vgId, req.subKey, pHandle->consumerId); - taosRUnLockLatch(&pTq->lock); + taosWUnLockLatch(&pTq->lock); code = TSDB_CODE_TMQ_CONSUMER_MISMATCH; goto end; } @@ -289,7 +291,7 @@ int32_t tqProcessSeekReq(STQ* pTq, SRpcMsg* pMsg) { // if consumer register to push manager, push empty to consumer to change vg status from TMQ_VG_STATUS__WAIT to // TMQ_VG_STATUS__IDLE, otherwise poll data failed after seek. tqUnregisterPushHandle(pTq, pHandle); - taosRUnLockLatch(&pTq->lock); + taosWUnLockLatch(&pTq->lock); end: rsp.code = code; @@ -496,15 +498,16 @@ int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg) { int32_t vgId = TD_VID(pTq->pVnode); // 1. find handle + taosRLockLatch(&pTq->lock); STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey)); if (pHandle == NULL) { tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s not found", consumerId, vgId, req.subKey); terrno = TSDB_CODE_INVALID_MSG; + taosRUnLockLatch(&pTq->lock); return -1; } // 2. check re-balance status - taosRLockLatch(&pTq->lock); if (pHandle->consumerId != consumerId) { tqDebug("ERROR consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64, consumerId, vgId, req.subKey, pHandle->consumerId); @@ -580,7 +583,7 @@ int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg bool exec = tqIsHandleExec(pHandle); if(exec){ - tqInfo("vgId:%d, topic:%s, subscription is executing, wait for 10ms and retry, pHandle:%p", vgId, + tqInfo("vgId:%d, topic:%s, subscription is executing, delete wait for 10ms and retry, pHandle:%p", vgId, pHandle->subKey, pHandle); taosWUnLockLatch(&pTq->lock); taosMsleep(10); @@ -667,7 +670,14 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg STqHandle* pHandle = NULL; while (1) { pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey)); - if (pHandle || tqMetaGetHandle(pTq, req.subKey) < 0) { + if (pHandle) { + break; + } + taosRLockLatch(&pTq->lock); + ret = tqMetaGetHandle(pTq, req.subKey); + taosRUnLockLatch(&pTq->lock); + + if (ret < 0) { break; } } @@ -687,21 +697,33 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg tqDestroyTqHandle(&handle); goto end; } - ret = tqMetaSaveHandle(pTq, req.subKey, &handle); - } else { taosWLockLatch(&pTq->lock); - - if (pHandle->consumerId == req.newConsumerId) { // do nothing - tqInfo("vgId:%d no switch consumer:0x%" PRIx64 " remains, because redo wal log", req.vgId, req.newConsumerId); - } else { - tqInfo("vgId:%d switch consumer from Id:0x%" PRIx64 " to Id:0x%" PRIx64, req.vgId, pHandle->consumerId, - req.newConsumerId); - atomic_store_64(&pHandle->consumerId, req.newConsumerId); - atomic_store_32(&pHandle->epoch, 0); - tqUnregisterPushHandle(pTq, pHandle); - ret = tqMetaSaveHandle(pTq, req.subKey, pHandle); - } + ret = tqMetaSaveHandle(pTq, req.subKey, &handle); taosWUnLockLatch(&pTq->lock); + } else { + while(1){ + taosWLockLatch(&pTq->lock); + bool exec = tqIsHandleExec(pHandle); + if(exec){ + tqInfo("vgId:%d, topic:%s, subscription is executing, sub wait for 10ms and retry, pHandle:%p", pTq->pVnode->config.vgId, + pHandle->subKey, pHandle); + taosWUnLockLatch(&pTq->lock); + taosMsleep(10); + continue; + } + if (pHandle->consumerId == req.newConsumerId) { // do nothing + tqInfo("vgId:%d no switch consumer:0x%" PRIx64 " remains, because redo wal log", req.vgId, req.newConsumerId); + } else { + tqInfo("vgId:%d switch consumer from Id:0x%" PRIx64 " to Id:0x%" PRIx64, req.vgId, pHandle->consumerId, + req.newConsumerId); + atomic_store_64(&pHandle->consumerId, req.newConsumerId); + atomic_store_32(&pHandle->epoch, 0); + tqUnregisterPushHandle(pTq, pHandle); + ret = tqMetaSaveHandle(pTq, req.subKey, pHandle); + } + taosWUnLockLatch(&pTq->lock); + break; + } } end: diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index cadbc70c6f..6a4650eb9d 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -367,7 +367,7 @@ int32_t extractMsgFromWal(SWalReader* pReader, void** pItem, int64_t maxVer, con bool tqNextBlockInWal(STqReader* pReader, const char* id) { SWalReader* pWalReader = pReader->pWalReader; -// uint64_t st = taosGetTimestampMs(); + uint64_t st = taosGetTimestampMs(); while (1) { SArray* pBlockList = pReader->submit.aSubmitTbData; if (pBlockList == NULL || pReader->nextBlk >= taosArrayGetSize(pBlockList)) { @@ -442,9 +442,9 @@ bool tqNextBlockInWal(STqReader* pReader, const char* id) { pReader->msg.msgStr = NULL; -// if(taosGetTimestampMs() - st > 5){ -// return false; -// } + if(taosGetTimestampMs() - st > 1000){ + return false; + } } } @@ -1087,6 +1087,7 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) { int32_t vgId = TD_VID(pTq->pVnode); // update the table list for each consumer handle + taosWLockLatch(&pTq->lock); while (1) { pIter = taosHashIterate(pTq->pHandle, pIter); if (pIter == NULL) { @@ -1116,6 +1117,8 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) { tqError("qGetTableList in tqUpdateTbUidList error:%d handle %s consumer:0x%" PRIx64, ret, pTqHandle->subKey, pTqHandle->consumerId); taosArrayDestroy(list); taosHashCancelIterate(pTq->pHandle, pIter); + taosWUnLockLatch(&pTq->lock); + return ret; } tqReaderSetTbUidList(pTqHandle->execHandle.pTqReader, list, NULL); @@ -1125,6 +1128,7 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) { } } } + taosWUnLockLatch(&pTq->lock); // update the table list handle for each stream scanner/wal reader taosWLockLatch(&pTq->pStreamMeta->lock); diff --git a/source/dnode/vnode/src/tq/tqSnapshot.c b/source/dnode/vnode/src/tq/tqSnapshot.c index 5c0649c109..16cbacd59d 100644 --- a/source/dnode/vnode/src/tq/tqSnapshot.c +++ b/source/dnode/vnode/src/tq/tqSnapshot.c @@ -200,7 +200,9 @@ int32_t tqSnapWrite(STqSnapWriter* pWriter, uint8_t* pData, uint32_t nData) { tDecoderInit(pDecoder, pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr)); code = tDecodeSTqHandle(pDecoder, &handle); if (code) goto _err; + taosWLockLatch(&pTq->lock); code = tqMetaSaveHandle(pTq, handle.subKey, &handle); + taosWUnLockLatch(&pTq->lock); if (code < 0) goto _err; tDecoderClear(pDecoder); diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index 62ef06fec2..73e7f9d44a 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -218,7 +218,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, walReaderVerifyOffset(pHandle->pWalReader, offset); int64_t fetchVer = offset->version; -// uint64_t st = taosGetTimestampMs(); + uint64_t st = taosGetTimestampMs(); int totalRows = 0; while (1) { // int32_t savedEpoch = atomic_load_32(&pHandle->epoch); @@ -265,8 +265,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, goto end; } -// if (totalRows >= 4096 || taosxRsp.createTableNum > 0 || (taosGetTimestampMs() - st > 5)) { - if (totalRows >= 4096 || taosxRsp.createTableNum > 0) { + if (totalRows >= 4096 || taosxRsp.createTableNum > 0 || (taosGetTimestampMs() - st > 1000)) { tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer + 1); code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, taosxRsp.createTableNum > 0 ? TMQ_MSG_TYPE__POLL_DATA_META_RSP : TMQ_MSG_TYPE__POLL_DATA_RSP, vgId); goto end; diff --git a/tests/system-test/7-tmq/subscribeDb3.py b/tests/system-test/7-tmq/subscribeDb3.py index b66334a6a6..6ddd829c4c 100644 --- a/tests/system-test/7-tmq/subscribeDb3.py +++ b/tests/system-test/7-tmq/subscribeDb3.py @@ -336,7 +336,7 @@ class TDTestCase: for i in range(expectRows): totalConsumeRows += resultList[i] - if totalConsumeRows > expectrowcnt or totalConsumeRows <= 0: + if totalConsumeRows > expectrowcnt or totalConsumeRows < 0: tdLog.info("act consume rows: %d, expect consume rows between %d and 0"%(totalConsumeRows, expectrowcnt)) tdLog.exit("tmq consume rows error!") diff --git a/tests/system-test/7-tmq/tmqConsFromTsdb1-1ctb-funcNFilter.py b/tests/system-test/7-tmq/tmqConsFromTsdb1-1ctb-funcNFilter.py index 6a03f0f751..117c3ce637 100644 --- a/tests/system-test/7-tmq/tmqConsFromTsdb1-1ctb-funcNFilter.py +++ b/tests/system-test/7-tmq/tmqConsFromTsdb1-1ctb-funcNFilter.py @@ -218,7 +218,7 @@ class TDTestCase: actConsumeTotalRows = resultList[0] - if not (actConsumeTotalRows > 0 and actConsumeTotalRows < totalRowsInserted): + if not (actConsumeTotalRows >= 0 and actConsumeTotalRows <= totalRowsInserted): tdLog.info("act consume rows: %d"%(actConsumeTotalRows)) tdLog.info("and second consume rows should be between 0 and %d"%(totalRowsInserted)) tdLog.exit("%d tmq consume rows error!"%consumerId) diff --git a/tests/system-test/7-tmq/tmqConsFromTsdb1-1ctb.py b/tests/system-test/7-tmq/tmqConsFromTsdb1-1ctb.py index c11159c6e5..2864240441 100644 --- a/tests/system-test/7-tmq/tmqConsFromTsdb1-1ctb.py +++ b/tests/system-test/7-tmq/tmqConsFromTsdb1-1ctb.py @@ -216,7 +216,7 @@ class TDTestCase: actConsumeTotalRows = resultList[0] tdLog.info("act consume rows: %d, expect rows range (0, %d)"%(actConsumeTotalRows, totalRowsInserted)) - if not (actConsumeTotalRows > 0 and actConsumeTotalRows < totalRowsInserted): + if not (actConsumeTotalRows >= 0 and actConsumeTotalRows <= totalRowsInserted): tdLog.info("act consume rows: %d"%(actConsumeTotalRows)) tdLog.info("and second consume rows should be between 0 and %d"%(totalRowsInserted)) tdLog.exit("%d tmq consume rows error!"%consumerId) diff --git a/tests/system-test/7-tmq/tmqConsFromTsdb1-mutilVg-mutilCtb.py b/tests/system-test/7-tmq/tmqConsFromTsdb1-mutilVg-mutilCtb.py index 439845aa54..d8606efe58 100644 --- a/tests/system-test/7-tmq/tmqConsFromTsdb1-mutilVg-mutilCtb.py +++ b/tests/system-test/7-tmq/tmqConsFromTsdb1-mutilVg-mutilCtb.py @@ -218,7 +218,7 @@ class TDTestCase: tdLog.info("act consume rows: %d"%(actConsumeTotalRows)) tdLog.info("and second consume rows should be between 0 and %d"%(totalRowsInserted)) - if not (actConsumeTotalRows > 0 and actConsumeTotalRows <= totalRowsInserted): + if not (actConsumeTotalRows >= 0 and actConsumeTotalRows <= totalRowsInserted): tdLog.exit("%d tmq consume rows error!"%consumerId) time.sleep(10) diff --git a/tests/system-test/7-tmq/tmqConsFromTsdb1-mutilVg.py b/tests/system-test/7-tmq/tmqConsFromTsdb1-mutilVg.py index 53ff020b08..05aa82c929 100644 --- a/tests/system-test/7-tmq/tmqConsFromTsdb1-mutilVg.py +++ b/tests/system-test/7-tmq/tmqConsFromTsdb1-mutilVg.py @@ -216,7 +216,7 @@ class TDTestCase: actConsumeTotalRows = resultList[0] - if not (actConsumeTotalRows > 0 and actConsumeTotalRows <= totalRowsInserted): + if not (actConsumeTotalRows >= 0 and actConsumeTotalRows <= totalRowsInserted): tdLog.info("act consume rows: %d"%(actConsumeTotalRows)) tdLog.info("and second consume rows should be between 0 and %d"%(totalRowsInserted)) tdLog.exit("%d tmq consume rows error!"%consumerId) diff --git a/tests/system-test/7-tmq/tmqConsFromTsdb1.py b/tests/system-test/7-tmq/tmqConsFromTsdb1.py index 4bb6cf463f..dcaa6ceb7c 100644 --- a/tests/system-test/7-tmq/tmqConsFromTsdb1.py +++ b/tests/system-test/7-tmq/tmqConsFromTsdb1.py @@ -217,7 +217,7 @@ class TDTestCase: tdLog.info("act consume rows: %d"%(actConsumeTotalRows)) tdLog.info("and second consume rows should be between 0 and %d"%(totalRowsInserted)) - if not ((actConsumeTotalRows > 0) and (actConsumeTotalRows <= totalRowsInserted)): + if not ((actConsumeTotalRows >= 0) and (actConsumeTotalRows <= totalRowsInserted)): tdLog.exit("%d tmq consume rows error!"%consumerId) time.sleep(10) From c33ef4ce88cbf625d181717131d7a612f8314f4c Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 18 Sep 2023 15:14:51 +0800 Subject: [PATCH 06/10] fix(stream): add null check --- include/libs/stream/tstream.h | 2 +- source/dnode/vnode/src/tq/tq.c | 3 +-- source/libs/stream/src/streamExec.c | 7 +++---- source/libs/stream/src/streamTask.c | 4 ++-- 4 files changed, 7 insertions(+), 9 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 2d70bb1e1c..a19ebd67b0 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -712,7 +712,7 @@ int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSo int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask); int32_t streamAlignTransferState(SStreamTask* pTask); -int32_t streamBuildAndSendDropTaskMsg(SStreamTask* pTask, int32_t vgId, SStreamTaskId* pTaskId); +int32_t streamBuildAndSendDropTaskMsg(SMsgCb* pMsgCb, int32_t vgId, SStreamTaskId* pTaskId); int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SStreamTask* pTask, int8_t isSucceed); int32_t buildCheckpointSourceRsp(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SRpcMsg* pMsg, diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index e639e272fa..f5909eb0fe 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1086,8 +1086,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { tqDebug("s-task:%s fill-history task set status to be dropping", id); -// streamMetaUnregisterTask(pMeta, pTask->id.streamId, pTask->id.taskId); - streamBuildAndSendDropTaskMsg(pTask, pMeta->vgId, &pTask->id); + streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &pTask->id); streamMetaReleaseTask(pMeta, pTask); return -1; } diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 91c46c8ad9..969b547d71 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -303,7 +303,7 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) { pTask->id.idStr, (int32_t) pTask->streamTaskId.taskId); // 1. free it and remove fill-history task from disk meta-store - streamBuildAndSendDropTaskMsg(pStreamTask, pMeta->vgId, &pTask->id); + streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &pTask->id); // 2. save to disk taosWLockLatch(&pMeta->lock); @@ -365,8 +365,7 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) { qDebug("s-task:%s fill-history task set status to be dropping, save the state into disk", pTask->id.idStr); // 4. free it and remove fill-history task from disk meta-store -// streamMetaUnregisterTask(pMeta, pTask->id.streamId, pTask->id.taskId); - streamBuildAndSendDropTaskMsg(pStreamTask, pMeta->vgId, &pTask->id); + streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &pTask->id); // 5. clear the link between fill-history task and stream task info pStreamTask->historyTaskId.taskId = 0; @@ -411,7 +410,7 @@ int32_t streamTransferStateToStreamTask(SStreamTask* pTask) { if (level == TASK_LEVEL__AGG || level == TASK_LEVEL__SOURCE) { // do transfer task operator states. code = streamDoTransferStateToStreamTask(pTask); } else { // drop fill-history task - streamBuildAndSendDropTaskMsg(pTask, pTask->pMeta->vgId, &pTask->id); + streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pTask->pMeta->vgId, &pTask->id); } return code; diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index ba8578f98e..d2e306fa01 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -644,7 +644,7 @@ int8_t streamTaskSetSchedStatusInActive(SStreamTask* pTask) { return status; } -int32_t streamBuildAndSendDropTaskMsg(SStreamTask* pTask, int32_t vgId, SStreamTaskId* pTaskId) { +int32_t streamBuildAndSendDropTaskMsg(SMsgCb* pMsgCb, int32_t vgId, SStreamTaskId* pTaskId) { SVDropStreamTaskReq *pReq = rpcMallocCont(sizeof(SVDropStreamTaskReq)); if (pReq == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -656,7 +656,7 @@ int32_t streamBuildAndSendDropTaskMsg(SStreamTask* pTask, int32_t vgId, SStreamT pReq->streamId = pTaskId->streamId; SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_DROP, .pCont = pReq, .contLen = sizeof(SVDropStreamTaskReq)}; - int32_t code = tmsgPutToQueue(pTask->pMsgCb, WRITE_QUEUE, &msg); + int32_t code = tmsgPutToQueue(pMsgCb, WRITE_QUEUE, &msg); if (code != TSDB_CODE_SUCCESS) { qError("vgId:%d failed to send drop task:0x%x msg, code:%s", vgId, pTaskId->taskId, tstrerror(code)); return code; From 36316c79feec09cbd5ce6574ad1c23a42b18bfca Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 18 Sep 2023 17:41:58 +0800 Subject: [PATCH 07/10] doc: update docs. --- docs/en/12-taos-sql/22-meta.md | 164 +++++++++++++++++---------------- docs/zh/12-taos-sql/22-meta.md | 155 ++++++++++++++++--------------- 2 files changed, 170 insertions(+), 149 deletions(-) diff --git a/docs/en/12-taos-sql/22-meta.md b/docs/en/12-taos-sql/22-meta.md index a9eec511c5..fad479d9d3 100644 --- a/docs/en/12-taos-sql/22-meta.md +++ b/docs/en/12-taos-sql/22-meta.md @@ -32,10 +32,10 @@ Provides information about dnodes. Similar to SHOW DNODES. Users whose SYSINFO a | --- | :------------: | ------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------- | | 1 | vnodes | SMALLINT | Current number of vnodes on the dnode. It should be noted that `vnodes` is a TDengine keyword and needs to be escaped with ` when used as a column name. | | 2 | support_vnodes | SMALLINT | Maximum number of vnodes on the dnode | -| 3 | status | BINARY(10) | Current status | -| 4 | note | BINARY(256) | Reason for going offline or other information | +| 3 | status | VARCHAR(10) | Current status | +| 4 | note | VARCHAR(256) | Reason for going offline or other information | | 5 | id | SMALLINT | Dnode ID | -| 6 | endpoint | BINARY(134) | Dnode endpoint | +| 6 | endpoint | VARCHAR(134) | Dnode endpoint | | 7 | create | TIMESTAMP | Creation time | ## INS_MNODES @@ -45,8 +45,8 @@ Provides information about mnodes. Similar to SHOW MNODES. Users whose SYSINFO a | # | **Column** | **Data Type** | **Description** | | --- | :---------: | ------------- | ------------------------------------------ | | 1 | id | SMALLINT | Mnode ID | -| 2 | endpoint | BINARY(134) | Mnode endpoint | -| 3 | role | BINARY(10) | Current role | +| 2 | endpoint | VARCHAR(134) | Mnode endpoint | +| 3 | role | VARCHAR(10) | Current role | | 4 | role_time | TIMESTAMP | Time at which the current role was assumed | | 5 | create_time | TIMESTAMP | Creation time | @@ -57,7 +57,17 @@ Provides information about qnodes. Similar to SHOW QNODES. Users whose SYSINFO a | # | **Column** | **Data Type** | **Description** | | --- | :---------: | ------------- | --------------- | | 1 | id | SMALLINT | Qnode ID | -| 2 | endpoint | BINARY(134) | Qnode endpoint | +| 2 | endpoint | VARCHAR(134) | Qnode endpoint | +| 3 | create_time | TIMESTAMP | Creation time | + +## INS_SNODES + +Provides information about snodes. Similar to SHOW SNODES. Users whose SYSINFO attribute is 0 can't view this table. + +| # | **Column** | **Data Type** | **Description** | +| --- | :---------: | ------------- | --------------- | +| 1 | id | SMALLINT | Snode ID | +| 2 | endpoint | VARCHAR(134) | Snode endpoint | | 3 | create_time | TIMESTAMP | Creation time | ## INS_CLUSTER @@ -67,7 +77,7 @@ Provides information about the cluster. Users whose SYSINFO attribute is 0 can't | # | **Column** | **Data Type** | **Description** | | --- | :---------: | ------------- | --------------- | | 1 | id | BIGINT | Cluster ID | -| 2 | name | BINARY(134) | Cluster name | +| 2 | name | VARCHAR(134) | Cluster name | | 3 | create_time | TIMESTAMP | Creation time | ## INS_DATABASES @@ -111,15 +121,15 @@ Provides information about user-defined functions. | # | **Column** | **Data Type** | **Description** | | --- | :-----------: | ------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------- | -| 1 | name | BINARY(64) | Function name | -| 2 | comment | BINARY(255) | Function description. It should be noted that `comment` is a TDengine keyword and needs to be escaped with ` when used as a column name. | +| 1 | name | VARCHAR(64) | Function name | +| 2 | comment | VARCHAR(255) | Function description. It should be noted that `comment` is a TDengine keyword and needs to be escaped with ` when used as a column name. | | 3 | aggregate | INT | Whether the UDF is an aggregate function. It should be noted that `aggregate` is a TDengine keyword and needs to be escaped with ` when used as a column name. | -| 4 | output_type | BINARY(31) | Output data type | +| 4 | output_type | VARCHAR(31) | Output data type | | 5 | create_time | TIMESTAMP | Creation time | | 6 | code_len | INT | Length of the source code | | 7 | bufsize | INT | Buffer size | -| 8 | func_language | BINARY(31) | UDF programming language | -| 9 | func_body | BINARY(16384) | UDF function body | +| 8 | func_language | VARCHAR(31) | UDF programming language | +| 9 | func_body | VARCHAR(16384) | UDF function body | | 10 | func_version | INT | UDF function version. starting from 0. Increasing by 1 each time it is updated | ## INS_INDEXES @@ -128,12 +138,12 @@ Provides information about user-created indices. Similar to SHOW INDEX. | # | **Column** | **Data Type** | **Description** | | --- | :--------------: | ------------- | --------------------------------------------------------------------- | -| 1 | db_name | BINARY(32) | Database containing the table with the specified index | -| 2 | table_name | BINARY(192) | Table containing the specified index | -| 3 | index_name | BINARY(192) | Index name | -| 4 | db_name | BINARY(64) | Index column | -| 5 | index_type | BINARY(10) | SMA or tag index | -| 6 | index_extensions | BINARY(256) | Other information For SMA/tag indices, this shows a list of functions | +| 1 | db_name | VARCHAR(32) | Database containing the table with the specified index | +| 2 | table_name | VARCHAR(192) | Table containing the specified index | +| 3 | index_name | VARCHAR(192) | Index name | +| 4 | db_name | VARCHAR(64) | Index column | +| 5 | index_type | VARCHAR(10) | SMA or tag index | +| 6 | index_extensions | VARCHAR(256) | Other information For SMA/tag indices, this shows a list of functions | ## INS_STABLES @@ -141,16 +151,16 @@ Provides information about supertables. | # | **Column** | **Data Type** | **Description** | | --- | :-----------: | ------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | -| 1 | stable_name | BINARY(192) | Supertable name | -| 2 | db_name | BINARY(64) | All databases in the supertable | +| 1 | stable_name | VARCHAR(192) | Supertable name | +| 2 | db_name | VARCHAR(64) | All databases in the supertable | | 3 | create_time | TIMESTAMP | Creation time | | 4 | columns | INT | Number of columns | | 5 | tags | INT | Number of tags. It should be noted that `tags` is a TDengine keyword and needs to be escaped with ` when used as a column name. | | 6 | last_update | TIMESTAMP | Last updated time | -| 7 | table_comment | BINARY(1024) | Table description | -| 8 | watermark | BINARY(64) | Window closing time. It should be noted that `watermark` is a TDengine keyword and needs to be escaped with ` when used as a column name. | -| 9 | max_delay | BINARY(64) | Maximum delay for pushing stream processing results. It should be noted that `max_delay` is a TDengine keyword and needs to be escaped with ` when used as a column name. | -| 10 | rollup | BINARY(128) | Rollup aggregate function. It should be noted that `rollup` is a TDengine keyword and needs to be escaped with ` when used as a column name. | +| 7 | table_comment | VARCHAR(1024) | Table description | +| 8 | watermark | VARCHAR(64) | Window closing time. It should be noted that `watermark` is a TDengine keyword and needs to be escaped with ` when used as a column name. | +| 9 | max_delay | VARCHAR(64) | Maximum delay for pushing stream processing results. It should be noted that `max_delay` is a TDengine keyword and needs to be escaped with ` when used as a column name. | +| 10 | rollup | VARCHAR(128) | Rollup aggregate function. It should be noted that `rollup` is a TDengine keyword and needs to be escaped with ` when used as a column name. | ## INS_TABLES @@ -158,37 +168,37 @@ Provides information about standard tables and subtables. | # | **Column** | **Data Type** | **Description** | | --- | :-----------: | ------------- | ---------------------------------------------------------------------------------------------------------------------------------- | -| 1 | table_name | BINARY(192) | Table name | -| 2 | db_name | BINARY(64) | Database name | +| 1 | table_name | VARCHAR(192) | Table name | +| 2 | db_name | VARCHAR(64) | Database name | | 3 | create_time | TIMESTAMP | Creation time | | 4 | columns | INT | Number of columns | -| 5 | stable_name | BINARY(192) | Supertable name | +| 5 | stable_name | VARCHAR(192) | Supertable name | | 6 | uid | BIGINT | Table ID | | 7 | vgroup_id | INT | Vgroup ID | | 8 | ttl | INT | Table time-to-live. It should be noted that `ttl` is a TDengine keyword and needs to be escaped with ` when used as a column name. | -| 9 | table_comment | BINARY(1024) | Table description | -| 10 | type | BINARY(20) | Table type | +| 9 | table_comment | VARCHAR(1024) | Table description | +| 10 | type | VARCHAR(20) | Table type | ## INS_TAGS | # | **Column** | **Data Type** | **Description** | | --- | :---------: | ------------- | --------------- | -| 1 | table_name | BINARY(192) | Table name | -| 2 | db_name | BINARY(64) | Database name | -| 3 | stable_name | BINARY(192) | Supertable name | -| 4 | tag_name | BINARY(64) | Tag name | -| 5 | tag_type | BINARY(64) | Tag type | -| 6 | tag_value | BINARY(16384) | Tag value | +| 1 | table_name | VARCHAR(192) | Table name | +| 2 | db_name | VARCHAR(64) | Database name | +| 3 | stable_name | VARCHAR(192) | Supertable name | +| 4 | tag_name | VARCHAR(64) | Tag name | +| 5 | tag_type | VARCHAR(64) | Tag type | +| 6 | tag_value | VARCHAR(16384) | Tag value | ## INS_COLUMNS | # | **Column** | **Data Type** | **Description** | | --- | :-----------: | ------------- | ---------------- | -| 1 | table_name | BINARY(192) | Table name | -| 2 | db_name | BINARY(64) | Database name | -| 3 | table_type | BINARY(21) | Table type | -| 4 | col_name | BINARY(64) | Column name | -| 5 | col_type | BINARY(32) | Column type | +| 1 | table_name | VARCHAR(192) | Table name | +| 2 | db_name | VARCHAR(64) | Database name | +| 3 | table_type | VARCHAR(21) | Table type | +| 4 | col_name | VARCHAR(64) | Column name | +| 5 | col_type | VARCHAR(32) | Column type | | 6 | col_length | INT | Column length | | 7 | col_precision | INT | Column precision | | 8 | col_scale | INT | Column scale | @@ -200,8 +210,8 @@ Provides information about TDengine users. Users whose SYSINFO attribute is 0 ca | # | **Column** | **Data Type** | **Description** | | --- | :---------: | ------------- | ---------------- | -| 1 | user_name | BINARY(23) | User name | -| 2 | privilege | BINARY(256) | User permissions | +| 1 | user_name | VARCHAR(23) | User name | +| 2 | privilege | VARCHAR(256) | User permissions | | 3 | create_time | TIMESTAMP | Creation time | ## INS_GRANTS @@ -210,20 +220,20 @@ Provides information about TDengine Enterprise Edition permissions. Users whose | # | **Column** | **Data Type** | **Description** | | --- | :---------: | ------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------- | -| 1 | version | BINARY(9) | Whether the deployment is a licensed or trial version | -| 2 | cpu_cores | BINARY(9) | CPU cores included in license | -| 3 | dnodes | BINARY(10) | Dnodes included in license. It should be noted that `dnodes` is a TDengine keyword and needs to be escaped with ` when used as a column name. | -| 4 | streams | BINARY(10) | Streams included in license. It should be noted that `streams` is a TDengine keyword and needs to be escaped with ` when used as a column name. | -| 5 | users | BINARY(10) | Users included in license. It should be noted that `users` is a TDengine keyword and needs to be escaped with ` when used as a column name. | -| 6 | accounts | BINARY(10) | Accounts included in license. It should be noted that `accounts` is a TDengine keyword and needs to be escaped with ` when used as a column name. | -| 7 | storage | BINARY(21) | Storage space included in license. It should be noted that `storage` is a TDengine keyword and needs to be escaped with ` when used as a column name. | -| 8 | connections | BINARY(21) | Client connections included in license. It should be noted that `connections` is a TDengine keyword and needs to be escaped with ` when used as a column name. | -| 9 | databases | BINARY(11) | Databases included in license. It should be noted that `databases` is a TDengine keyword and needs to be escaped with ` when used as a column name. | -| 10 | speed | BINARY(9) | Write speed specified in license (data points per second) | -| 11 | querytime | BINARY(9) | Total query time specified in license | -| 12 | timeseries | BINARY(21) | Number of metrics included in license | -| 13 | expired | BINARY(5) | Whether the license has expired | -| 14 | expire_time | BINARY(19) | When the trial period expires | +| 1 | version | VARCHAR(9) | Whether the deployment is a licensed or trial version | +| 2 | cpu_cores | VARCHAR(9) | CPU cores included in license | +| 3 | dnodes | VARCHAR(10) | Dnodes included in license. It should be noted that `dnodes` is a TDengine keyword and needs to be escaped with ` when used as a column name. | +| 4 | streams | VARCHAR(10) | Streams included in license. It should be noted that `streams` is a TDengine keyword and needs to be escaped with ` when used as a column name. | +| 5 | users | VARCHAR(10) | Users included in license. It should be noted that `users` is a TDengine keyword and needs to be escaped with ` when used as a column name. | +| 6 | accounts | VARCHAR(10) | Accounts included in license. It should be noted that `accounts` is a TDengine keyword and needs to be escaped with ` when used as a column name. | +| 7 | storage | VARCHAR(21) | Storage space included in license. It should be noted that `storage` is a TDengine keyword and needs to be escaped with ` when used as a column name. | +| 8 | connections | VARCHAR(21) | Client connections included in license. It should be noted that `connections` is a TDengine keyword and needs to be escaped with ` when used as a column name. | +| 9 | databases | VARCHAR(11) | Databases included in license. It should be noted that `databases` is a TDengine keyword and needs to be escaped with ` when used as a column name. | +| 10 | speed | VARCHAR(9) | Write speed specified in license (data points per second) | +| 11 | querytime | VARCHAR(9) | Total query time specified in license | +| 12 | timeseries | VARCHAR(21) | Number of metrics included in license | +| 13 | expired | VARCHAR(5) | Whether the license has expired | +| 14 | expire_time | VARCHAR(19) | When the trial period expires | ## INS_VGROUPS @@ -232,15 +242,15 @@ Provides information about vgroups. Users whose SYSINFO attribute is 0 can't vie | # | **Column** | **Data Type** | **Description** | | --- | :--------: | ------------- | ----------------------------------------------------------------------------------------------------------------------------------- | | 1 | vgroup_id | INT | Vgroup ID | -| 2 | db_name | BINARY(32) | Database name | +| 2 | db_name | VARCHAR(32) | Database name | | 3 | tables | INT | Tables in vgroup. It should be noted that `tables` is a TDengine keyword and needs to be escaped with ` when used as a column name. | -| 4 | status | BINARY(10) | Vgroup status | +| 4 | status | VARCHAR(10) | Vgroup status | | 5 | v1_dnode | INT | Dnode ID of first vgroup member | -| 6 | v1_status | BINARY(10) | Status of first vgroup member | +| 6 | v1_status | VARCHAR(10) | Status of first vgroup member | | 7 | v2_dnode | INT | Dnode ID of second vgroup member | -| 8 | v2_status | BINARY(10) | Status of second vgroup member | +| 8 | v2_status | VARCHAR(10) | Status of second vgroup member | | 9 | v3_dnode | INT | Dnode ID of third vgroup member | -| 10 | v3_status | BINARY(10) | Status of third vgroup member | +| 10 | v3_status | VARCHAR(10) | Status of third vgroup member | | 11 | nfiles | INT | Number of data and metadata files in the vgroup | | 12 | file_size | INT | Size of the data and metadata files in the vgroup | | 13 | tsma | TINYINT | Whether time-range-wise SMA is enabled. 1 means enabled; 0 means disabled. | @@ -251,8 +261,8 @@ Provides system configuration information. | # | **Column** | **Data Type** | **Description** | | --- | :--------: | ------------- | ----------------------------------------------------------------------------------------------------------------------- | -| 1 | name | BINARY(32) | Parameter | -| 2 | value | BINARY(64) | Value. It should be noted that `value` is a TDengine keyword and needs to be escaped with ` when used as a column name. | +| 1 | name | VARCHAR(32) | Parameter | +| 2 | value | VARCHAR(64) | Value. It should be noted that `value` is a TDengine keyword and needs to be escaped with ` when used as a column name. | ## INS_DNODE_VARIABLES @@ -261,40 +271,40 @@ Provides dnode configuration information. Users whose SYSINFO attribute is 0 can | # | **Column** | **Data Type** | **Description** | | --- | :--------: | ------------- | ----------------------------------------------------------------------------------------------------------------------- | | 1 | dnode_id | INT | Dnode ID | -| 2 | name | BINARY(32) | Parameter | -| 3 | value | BINARY(64) | Value. It should be noted that `value` is a TDengine keyword and needs to be escaped with ` when used as a column name. | +| 2 | name | VARCHAR(32) | Parameter | +| 3 | value | VARCHAR(64) | Value. It should be noted that `value` is a TDengine keyword and needs to be escaped with ` when used as a column name. | ## INS_TOPICS | # | **Column** | **Data Type** | **Description** | | --- | :---------: | ------------- | -------------------------------------- | -| 1 | topic_name | BINARY(192) | Topic name | -| 2 | db_name | BINARY(64) | Database for the topic | +| 1 | topic_name | VARCHAR(192) | Topic name | +| 2 | db_name | VARCHAR(64) | Database for the topic | | 3 | create_time | TIMESTAMP | Creation time | -| 4 | sql | BINARY(1024) | SQL statement used to create the topic | +| 4 | sql | VARCHAR(1024) | SQL statement used to create the topic | ## INS_SUBSCRIPTIONS | # | **Column** | **Data Type** | **Description** | | --- | :------------: | ------------- | --------------------------- | -| 1 | topic_name | BINARY(204) | Subscribed topic | -| 2 | consumer_group | BINARY(193) | Subscribed consumer group | +| 1 | topic_name | VARCHAR(204) | Subscribed topic | +| 2 | consumer_group | VARCHAR(193) | Subscribed consumer group | | 3 | vgroup_id | INT | Vgroup ID for the consumer | | 4 | consumer_id | BIGINT | Consumer ID | -| 5 | offset | BINARY(64) | Consumption progress | +| 5 | offset | VARCHAR(64) | Consumption progress | | 6 | rows | BIGINT | Number of consumption items | ## INS_STREAMS | # | **Column** | **Data Type** | **Description** | | --- | :----------: | ------------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | -| 1 | stream_name | BINARY(64) | Stream name | +| 1 | stream_name | VARCHAR(64) | Stream name | | 2 | create_time | TIMESTAMP | Creation time | -| 3 | sql | BINARY(1024) | SQL statement used to create the stream | -| 4 | status | BINARY(20) | Current status | -| 5 | source_db | BINARY(64) | Source database | -| 6 | target_db | BINARY(64) | Target database | -| 7 | target_table | BINARY(192) | Target table | +| 3 | sql | VARCHAR(1024) | SQL statement used to create the stream | +| 4 | status | VARCHAR(20) | Current status | +| 5 | source_db | VARCHAR(64) | Source database | +| 6 | target_db | VARCHAR(64) | Target database | +| 7 | target_table | VARCHAR(192) | Target table | | 8 | watermark | BIGINT | Watermark (see stream processing documentation). It should be noted that `watermark` is a TDengine keyword and needs to be escaped with ` when used as a column name. | | 9 | trigger | INT | Method of triggering the result push (see stream processing documentation). It should be noted that `trigger` is a TDengine keyword and needs to be escaped with ` when used as a column name. | diff --git a/docs/zh/12-taos-sql/22-meta.md b/docs/zh/12-taos-sql/22-meta.md index 3d124db51f..db53dd462b 100644 --- a/docs/zh/12-taos-sql/22-meta.md +++ b/docs/zh/12-taos-sql/22-meta.md @@ -57,9 +57,20 @@ TDengine 内置了一个名为 `INFORMATION_SCHEMA` 的数据库,提供对数 | # | **列名** | **数据类型** | **说明** | | --- | :---------: | ------------ | ------------ | | 1 | id | SMALLINT | qnode id | -| 2 | endpoint | BINARY(134) | qnode 的地址 | +| 2 | endpoint | VARCHAR(134) | qnode 的地址 | | 3 | create_time | TIMESTAMP | 创建时间 | +## INS_SNODES + +当前系统中 SNODE 的信息。也可以使用 SHOW SNODES 来查询这些信息。SYSINFO 属性为 0 的用户不能查看此表。 + +| # | **列名** | **数据类型** | **说明** | +| --- | :---------: | ------------ | ------------ | +| 1 | id | SMALLINT | snode id | +| 2 | endpoint | VARCHAR(134) | snode 的地址 | +| 3 | create_time | TIMESTAMP | 创建时间 | + + ## INS_CLUSTER 存储集群相关信息。 SYSINFO 属性为 0 的用户不能查看此表。 @@ -67,7 +78,7 @@ TDengine 内置了一个名为 `INFORMATION_SCHEMA` 的数据库,提供对数 | # | **列名** | **数据类型** | **说明** | | --- | :---------: | ------------ | ---------- | | 1 | id | BIGINT | cluster id | -| 2 | name | BINARY(134) | 集群名称 | +| 2 | name | VARCHAR(134) | 集群名称 | | 3 | create_time | TIMESTAMP | 创建时间 | ## INS_DATABASES @@ -111,15 +122,15 @@ TDengine 内置了一个名为 `INFORMATION_SCHEMA` 的数据库,提供对数 | # | **列名** | **数据类型** | **说明** | | --- | :-----------: | ------------- | --------------------------------------------------------------------------------------------- | -| 1 | name | BINARY(64) | 函数名 | -| 2 | comment | BINARY(255) | 补充说明。需要注意,`comment` 为 TDengine 关键字,作为列名使用时需要使用 ` 进行转义。 | +| 1 | name | VARCHAR(64) | 函数名 | +| 2 | comment | VARCHAR(255) | 补充说明。需要注意,`comment` 为 TDengine 关键字,作为列名使用时需要使用 ` 进行转义。 | | 3 | aggregate | INT | 是否为聚合函数。需要注意,`aggregate` 为 TDengine 关键字,作为列名使用时需要使用 ` 进行转义。 | -| 4 | output_type | BINARY(31) | 输出类型 | +| 4 | output_type | VARCHAR(31) | 输出类型 | | 5 | create_time | TIMESTAMP | 创建时间 | | 6 | code_len | INT | 代码长度 | | 7 | bufsize | INT | buffer 大小 | -| 8 | func_language | BINARY(31) | 自定义函数编程语言 | -| 9 | func_body | BINARY(16384) | 函数体定义 | +| 8 | func_language | VARCHAR(31) | 自定义函数编程语言 | +| 9 | func_body | VARCHAR(16384) | 函数体定义 | | 10 | func_version | INT | 函数版本号。初始版本为0,每次替换更新,版本号加1。 | @@ -129,12 +140,12 @@ TDengine 内置了一个名为 `INFORMATION_SCHEMA` 的数据库,提供对数 | # | **列名** | **数据类型** | **说明** | | --- | :--------------: | ------------ | ------------------------------------------------------- | -| 1 | db_name | BINARY(32) | 包含此索引的表所在的数据库名 | -| 2 | table_name | BINARY(192) | 包含此索引的表的名称 | -| 3 | index_name | BINARY(192) | 索引名 | -| 4 | column_name | BINARY(64) | 建索引的列的列名 | -| 5 | index_type | BINARY(10) | 目前有 SMA 和 tag | -| 6 | index_extensions | BINARY(256) | 索引的额外信息。对 SMA/tag 类型的索引,是函数名的列表。 | +| 1 | db_name | VARCHAR(32) | 包含此索引的表所在的数据库名 | +| 2 | table_name | VARCHAR(192) | 包含此索引的表的名称 | +| 3 | index_name | VARCHAR(192) | 索引名 | +| 4 | column_name | VARCHAR(64) | 建索引的列的列名 | +| 5 | index_type | VARCHAR(10) | 目前有 SMA 和 tag | +| 6 | index_extensions | VARCHAR(256) | 索引的额外信息。对 SMA/tag 类型的索引,是函数名的列表。 | ## INS_STABLES @@ -142,16 +153,16 @@ TDengine 内置了一个名为 `INFORMATION_SCHEMA` 的数据库,提供对数 | # | **列名** | **数据类型** | **说明** | | --- | :-----------: | ------------ | ----------------------------------------------------------------------------------------------------- | -| 1 | stable_name | BINARY(192) | 超级表表名 | -| 2 | db_name | BINARY(64) | 超级表所在的数据库的名称 | +| 1 | stable_name | VARCHAR(192) | 超级表表名 | +| 2 | db_name | VARCHAR(64) | 超级表所在的数据库的名称 | | 3 | create_time | TIMESTAMP | 创建时间 | | 4 | columns | INT | 列数目 | | 5 | tags | INT | 标签数目。需要注意,`tags` 为 TDengine 关键字,作为列名使用时需要使用 ` 进行转义。 | | 6 | last_update | TIMESTAMP | 最后更新时间 | -| 7 | table_comment | BINARY(1024) | 表注释 | -| 8 | watermark | BINARY(64) | 窗口的关闭时间。需要注意,`watermark` 为 TDengine 关键字,作为列名使用时需要使用 ` 进行转义。 | -| 9 | max_delay | BINARY(64) | 推送计算结果的最大延迟。需要注意,`max_delay` 为 TDengine 关键字,作为列名使用时需要使用 ` 进行转义。 | -| 10 | rollup | BINARY(128) | rollup 聚合函数。需要注意,`rollup` 为 TDengine 关键字,作为列名使用时需要使用 ` 进行转义。 | +| 7 | table_comment | VARCHAR(1024) | 表注释 | +| 8 | watermark | VARCHAR(64) | 窗口的关闭时间。需要注意,`watermark` 为 TDengine 关键字,作为列名使用时需要使用 ` 进行转义。 | +| 9 | max_delay | VARCHAR(64) | 推送计算结果的最大延迟。需要注意,`max_delay` 为 TDengine 关键字,作为列名使用时需要使用 ` 进行转义。 | +| 10 | rollup | VARCHAR(128) | rollup 聚合函数。需要注意,`rollup` 为 TDengine 关键字,作为列名使用时需要使用 ` 进行转义。 | ## INS_TABLES @@ -159,37 +170,37 @@ TDengine 内置了一个名为 `INFORMATION_SCHEMA` 的数据库,提供对数 | # | **列名** | **数据类型** | **说明** | | --- | :-----------: | ------------ | ------------------------------------------------------------------------------------- | -| 1 | table_name | BINARY(192) | 表名 | -| 2 | db_name | BINARY(64) | 数据库名 | +| 1 | table_name | VARCHAR(192) | 表名 | +| 2 | db_name | VARCHAR(64) | 数据库名 | | 3 | create_time | TIMESTAMP | 创建时间 | | 4 | columns | INT | 列数目 | -| 5 | stable_name | BINARY(192) | 所属的超级表表名 | +| 5 | stable_name | VARCHAR(192) | 所属的超级表表名 | | 6 | uid | BIGINT | 表 id | | 7 | vgroup_id | INT | vgroup id | | 8 | ttl | INT | 表的生命周期。需要注意,`ttl` 为 TDengine 关键字,作为列名使用时需要使用 ` 进行转义。 | -| 9 | table_comment | BINARY(1024) | 表注释 | -| 10 | type | BINARY(21) | 表类型 | +| 9 | table_comment | VARCHAR(1024) | 表注释 | +| 10 | type | VARCHAR(21) | 表类型 | ## INS_TAGS | # | **列名** | **数据类型** | **说明** | | --- | :---------: | ------------- | ---------------------- | -| 1 | table_name | BINARY(192) | 表名 | -| 2 | db_name | BINARY(64) | 该表所在的数据库的名称 | -| 3 | stable_name | BINARY(192) | 所属的超级表表名 | -| 4 | tag_name | BINARY(64) | tag 的名称 | -| 5 | tag_type | BINARY(64) | tag 的类型 | -| 6 | tag_value | BINARY(16384) | tag 的值 | +| 1 | table_name | VARCHAR(192) | 表名 | +| 2 | db_name | VARCHAR(64) | 该表所在的数据库的名称 | +| 3 | stable_name | VARCHAR(192) | 所属的超级表表名 | +| 4 | tag_name | VARCHAR(64) | tag 的名称 | +| 5 | tag_type | VARCHAR(64) | tag 的类型 | +| 6 | tag_value | VARCHAR(16384) | tag 的值 | ## INS_COLUMNS | # | **列名** | **数据类型** | **说明** | | --- | :-----------: | ------------ | ---------------------- | -| 1 | table_name | BINARY(192) | 表名 | -| 2 | db_name | BINARY(64) | 该表所在的数据库的名称 | -| 3 | table_type | BINARY(21) | 表类型 | -| 4 | col_name | BINARY(64) | 列 的名称 | -| 5 | col_type | BINARY(32) | 列 的类型 | +| 1 | table_name | VARCHAR(192) | 表名 | +| 2 | db_name | VARCHAR(64) | 该表所在的数据库的名称 | +| 3 | table_type | VARCHAR(21) | 表类型 | +| 4 | col_name | VARCHAR(64) | 列 的名称 | +| 5 | col_type | VARCHAR(32) | 列 的类型 | | 6 | col_length | INT | 列 的长度 | | 7 | col_precision | INT | 列 的精度 | | 8 | col_scale | INT | 列 的比例 | @@ -201,8 +212,8 @@ TDengine 内置了一个名为 `INFORMATION_SCHEMA` 的数据库,提供对数 | # | **列名** | **数据类型** | **说明** | | --- | :---------: | ------------ | -------- | -| 1 | user_name | BINARY(23) | 用户名 | -| 2 | privilege | BINARY(256) | 权限 | +| 1 | user_name | VARCHAR(23) | 用户名 | +| 2 | privilege | VARCHAR(256) | 权限 | | 3 | create_time | TIMESTAMP | 创建时间 | ## INS_GRANTS @@ -211,20 +222,20 @@ TDengine 内置了一个名为 `INFORMATION_SCHEMA` 的数据库,提供对数 | # | **列名** | **数据类型** | **说明** | | --- | :---------: | ------------ | --------------------------------------------------------------------------------------------------------- | -| 1 | version | BINARY(9) | 企业版授权说明:official(官方授权的)/trial(试用的) | -| 2 | cpu_cores | BINARY(9) | 授权使用的 CPU 核心数量 | -| 3 | dnodes | BINARY(10) | 授权使用的 dnode 节点数量。需要注意,`dnodes` 为 TDengine 关键字,作为列名使用时需要使用 ` 进行转义。 | -| 4 | streams | BINARY(10) | 授权创建的流数量。需要注意,`streams` 为 TDengine 关键字,作为列名使用时需要使用 ` 进行转义。 | -| 5 | users | BINARY(10) | 授权创建的用户数量。需要注意,`users` 为 TDengine 关键字,作为列名使用时需要使用 ` 进行转义。 | -| 6 | accounts | BINARY(10) | 授权创建的帐户数量。需要注意,`accounts` 为 TDengine 关键字,作为列名使用时需要使用 ` 进行转义。 | -| 7 | storage | BINARY(21) | 授权使用的存储空间大小。需要注意,`storage` 为 TDengine 关键字,作为列名使用时需要使用 ` 进行转义。 | -| 8 | connections | BINARY(21) | 授权使用的客户端连接数量。需要注意,`connections` 为 TDengine 关键字,作为列名使用时需要使用 ` 进行转义。 | -| 9 | databases | BINARY(11) | 授权使用的数据库数量。需要注意,`databases` 为 TDengine 关键字,作为列名使用时需要使用 ` 进行转义。 | -| 10 | speed | BINARY(9) | 授权使用的数据点每秒写入数量 | -| 11 | querytime | BINARY(9) | 授权使用的查询总时长 | -| 12 | timeseries | BINARY(21) | 授权使用的测点数量 | -| 13 | expired | BINARY(5) | 是否到期,true:到期,false:未到期 | -| 14 | expire_time | BINARY(19) | 试用期到期时间 | +| 1 | version | VARCHAR(9) | 企业版授权说明:official(官方授权的)/trial(试用的) | +| 2 | cpu_cores | VARCHAR(9) | 授权使用的 CPU 核心数量 | +| 3 | dnodes | VARCHAR(10) | 授权使用的 dnode 节点数量。需要注意,`dnodes` 为 TDengine 关键字,作为列名使用时需要使用 ` 进行转义。 | +| 4 | streams | VARCHAR(10) | 授权创建的流数量。需要注意,`streams` 为 TDengine 关键字,作为列名使用时需要使用 ` 进行转义。 | +| 5 | users | VARCHAR(10) | 授权创建的用户数量。需要注意,`users` 为 TDengine 关键字,作为列名使用时需要使用 ` 进行转义。 | +| 6 | accounts | VARCHAR(10) | 授权创建的帐户数量。需要注意,`accounts` 为 TDengine 关键字,作为列名使用时需要使用 ` 进行转义。 | +| 7 | storage | VARCHAR(21) | 授权使用的存储空间大小。需要注意,`storage` 为 TDengine 关键字,作为列名使用时需要使用 ` 进行转义。 | +| 8 | connections | VARCHAR(21) | 授权使用的客户端连接数量。需要注意,`connections` 为 TDengine 关键字,作为列名使用时需要使用 ` 进行转义。 | +| 9 | databases | VARCHAR(11) | 授权使用的数据库数量。需要注意,`databases` 为 TDengine 关键字,作为列名使用时需要使用 ` 进行转义。 | +| 10 | speed | VARCHAR(9) | 授权使用的数据点每秒写入数量 | +| 11 | querytime | VARCHAR(9) | 授权使用的查询总时长 | +| 12 | timeseries | VARCHAR(21) | 授权使用的测点数量 | +| 13 | expired | VARCHAR(5) | 是否到期,true:到期,false:未到期 | +| 14 | expire_time | VARCHAR(19) | 试用期到期时间 | ## INS_VGROUPS @@ -233,15 +244,15 @@ TDengine 内置了一个名为 `INFORMATION_SCHEMA` 的数据库,提供对数 | # | **列名** | **数据类型** | **说明** | | --- | :-------: | ------------ | ------------------------------------------------------------------------------------------------ | | 1 | vgroup_id | INT | vgroup id | -| 2 | db_name | BINARY(32) | 数据库名 | +| 2 | db_name | VARCHAR(32) | 数据库名 | | 3 | tables | INT | 此 vgroup 内有多少表。需要注意,`tables` 为 TDengine 关键字,作为列名使用时需要使用 ` 进行转义。 | -| 4 | status | BINARY(10) | 此 vgroup 的状态 | +| 4 | status | VARCHAR(10) | 此 vgroup 的状态 | | 5 | v1_dnode | INT | 第一个成员所在的 dnode 的 id | -| 6 | v1_status | BINARY(10) | 第一个成员的状态 | +| 6 | v1_status | VARCHAR(10) | 第一个成员的状态 | | 7 | v2_dnode | INT | 第二个成员所在的 dnode 的 id | -| 8 | v2_status | BINARY(10) | 第二个成员的状态 | +| 8 | v2_status | VARCHAR(10) | 第二个成员的状态 | | 9 | v3_dnode | INT | 第三个成员所在的 dnode 的 id | -| 10 | v3_status | BINARY(10) | 第三个成员的状态 | +| 10 | v3_status | VARCHAR(10) | 第三个成员的状态 | | 11 | nfiles | INT | 此 vgroup 中数据/元数据文件的数量 | | 12 | file_size | INT | 此 vgroup 中数据/元数据文件的大小 | | 13 | tsma | TINYINT | 此 vgroup 是否专用于 Time-range-wise SMA,1: 是, 0: 否 | @@ -252,8 +263,8 @@ TDengine 内置了一个名为 `INFORMATION_SCHEMA` 的数据库,提供对数 | # | **列名** | **数据类型** | **说明** | | --- | :------: | ------------ | --------------------------------------------------------------------------------------- | -| 1 | name | BINARY(32) | 配置项名称 | -| 2 | value | BINARY(64) | 该配置项的值。需要注意,`value` 为 TDengine 关键字,作为列名使用时需要使用 ` 进行转义。 | +| 1 | name | VARCHAR(32) | 配置项名称 | +| 2 | value | VARCHAR(64) | 该配置项的值。需要注意,`value` 为 TDengine 关键字,作为列名使用时需要使用 ` 进行转义。 | ## INS_DNODE_VARIABLES @@ -262,40 +273,40 @@ TDengine 内置了一个名为 `INFORMATION_SCHEMA` 的数据库,提供对数 | # | **列名** | **数据类型** | **说明** | | --- | :------: | ------------ | --------------------------------------------------------------------------------------- | | 1 | dnode_id | INT | dnode 的 ID | -| 2 | name | BINARY(32) | 配置项名称 | -| 3 | value | BINARY(64) | 该配置项的值。需要注意,`value` 为 TDengine 关键字,作为列名使用时需要使用 ` 进行转义。 | +| 2 | name | VARCHAR(32) | 配置项名称 | +| 3 | value | VARCHAR(64) | 该配置项的值。需要注意,`value` 为 TDengine 关键字,作为列名使用时需要使用 ` 进行转义。 | ## INS_TOPICS | # | **列名** | **数据类型** | **说明** | | --- | :---------: | ------------ | ------------------------------ | -| 1 | topic_name | BINARY(192) | topic 名称 | -| 2 | db_name | BINARY(64) | topic 相关的 DB | +| 1 | topic_name | VARCHAR(192) | topic 名称 | +| 2 | db_name | VARCHAR(64) | topic 相关的 DB | | 3 | create_time | TIMESTAMP | topic 的 创建时间 | -| 4 | sql | BINARY(1024) | 创建该 topic 时所用的 SQL 语句 | +| 4 | sql | VARCHAR(1024) | 创建该 topic 时所用的 SQL 语句 | ## INS_SUBSCRIPTIONS | # | **列名** | **数据类型** | **说明** | | --- | :------------: | ------------ | ------------------------ | -| 1 | topic_name | BINARY(204) | 被订阅的 topic | -| 2 | consumer_group | BINARY(193) | 订阅者的消费者组 | +| 1 | topic_name | VARCHAR(204) | 被订阅的 topic | +| 2 | consumer_group | VARCHAR(193) | 订阅者的消费者组 | | 3 | vgroup_id | INT | 消费者被分配的 vgroup id | | 4 | consumer_id | BIGINT | 消费者的唯一 id | -| 5 | offset | BINARY(64) | 消费者的消费进度 | +| 5 | offset | VARCHAR(64) | 消费者的消费进度 | | 6 | rows | BIGINT | 消费者的消费的数据条数 | ## INS_STREAMS | # | **列名** | **数据类型** | **说明** | | --- | :----------: | ------------ | -------------------------------------------------------------------------------------------------------------------- | -| 1 | stream_name | BINARY(64) | 流计算名称 | +| 1 | stream_name | VARCHAR(64) | 流计算名称 | | 2 | create_time | TIMESTAMP | 创建时间 | -| 3 | sql | BINARY(1024) | 创建流计算时提供的 SQL 语句 | -| 4 | status | BINARY(20) | 流当前状态 | -| 5 | source_db | BINARY(64) | 源数据库 | -| 6 | target_db | BINARY(64) | 目的数据库 | -| 7 | target_table | BINARY(192) | 流计算写入的目标表 | +| 3 | sql | VARCHAR(1024) | 创建流计算时提供的 SQL 语句 | +| 4 | status | VARCHAR(20) | 流当前状态 | +| 5 | source_db | VARCHAR(64) | 源数据库 | +| 6 | target_db | VARCHAR(64) | 目的数据库 | +| 7 | target_table | VARCHAR(192) | 流计算写入的目标表 | | 8 | watermark | BIGINT | watermark,详见 SQL 手册流式计算。需要注意,`watermark` 为 TDengine 关键字,作为列名使用时需要使用 ` 进行转义。 | | 9 | trigger | INT | 计算结果推送模式,详见 SQL 手册流式计算。需要注意,`trigger` 为 TDengine 关键字,作为列名使用时需要使用 ` 进行转义。 | From 59d8f7d10fa8ad0d0bb1920182fe1fdbe41303eb Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Mon, 18 Sep 2023 17:55:00 +0800 Subject: [PATCH 08/10] fix:return in progress in drop consumer group --- source/dnode/mnode/impl/src/mndSubscribe.c | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 3131112c9a..17c63529ed 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -875,7 +875,11 @@ end: mndReleaseSubscribe(pMnode, pSub); mndTransDrop(pTrans); - return code; + if (code != 0) { + mError("cgroup %s on topic:%s, failed to drop", dropReq.cgroup, dropReq.topic); + return code; + } + return TSDB_CODE_ACTION_IN_PROGRESS; } void mndCleanupSubscribe(SMnode *pMnode) {} From c7b8d572f6fc9c1fecf92370925159e457aa25c6 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 18 Sep 2023 18:25:22 +0800 Subject: [PATCH 09/10] fix(stream): set the output normal for stream dispatch when encountering the transfer state. --- source/libs/stream/src/streamDispatch.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 4d5234a68c..7003c570e9 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -990,6 +990,8 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i if (code != TSDB_CODE_SUCCESS) { // todo: do nothing if error happens } + // now ready for next data output + atomic_store_8(&pTask->outputInfo.status, TASK_OUTPUT_STATUS__NORMAL); return TSDB_CODE_SUCCESS; } From b1797834c1ae269a65f9dca11bbc8fd2444957a3 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Mon, 18 Sep 2023 20:58:20 +0800 Subject: [PATCH 10/10] fix: set snapshot writer compression --- source/dnode/vnode/src/tsdb/tsdbSnapshot.c | 1 + 1 file changed, 1 insertion(+) diff --git a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c index ed4257b86d..01b42f1f6a 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c +++ b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c @@ -1045,6 +1045,7 @@ int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapWr writer[0]->precision = pTsdb->keepCfg.precision; writer[0]->minRow = pTsdb->pVnode->config.tsdbCfg.minRows; writer[0]->maxRow = pTsdb->pVnode->config.tsdbCfg.maxRows; + writer[0]->cmprAlg = pTsdb->pVnode->config.tsdbCfg.compression; writer[0]->commitID = tsdbFSAllocEid(pTsdb->pFS); writer[0]->szPage = pTsdb->pVnode->config.tsdbPageSize; writer[0]->compactVersion = INT64_MAX;