From 77e63d0922f6d230a314d28863744185faab8aa5 Mon Sep 17 00:00:00 2001 From: Jing Sima Date: Thu, 26 Sep 2024 14:28:19 +0800 Subject: [PATCH 01/20] fix:[TD-32334] Generate correct time window when using interp with fill next and linear. --- source/libs/executor/src/timesliceoperator.c | 20 ++-- tests/system-test/2-query/interp.py | 120 ++++++++++++------- 2 files changed, 90 insertions(+), 50 deletions(-) diff --git a/source/libs/executor/src/timesliceoperator.c b/source/libs/executor/src/timesliceoperator.c index 2ea300ace8..70bf26405e 100644 --- a/source/libs/executor/src/timesliceoperator.c +++ b/source/libs/executor/src/timesliceoperator.c @@ -278,7 +278,7 @@ static bool checkNullRow(SExprSupp* pExprSup, SSDataBlock* pSrcBlock, int32_t in } static bool genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp* pExprSup, SSDataBlock* pResBlock, - SSDataBlock* pSrcBlock, int32_t index, bool beforeTs, SExecTaskInfo* pTaskInfo) { + SSDataBlock* pSrcBlock, int32_t index, bool beforeTs, SExecTaskInfo* pTaskInfo, bool genAfterBlock) { int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; int32_t rows = pResBlock->info.rows; @@ -427,7 +427,7 @@ static bool genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp break; } - if (start.key == INT64_MIN || end.key == INT64_MIN) { + if (start.key == INT64_MIN || end.key == INT64_MIN || genAfterBlock) { colDataSetNULL(pDst, rows); break; } @@ -463,8 +463,13 @@ static bool genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp break; } + if (genAfterBlock && rows == 0) { + hasInterp = false; + break; + } + SGroupKeys* pkey = taosArrayGet(pSliceInfo->pNextRow, srcSlot); - if (pkey->isNull == false) { + if (pkey->isNull == false && !genAfterBlock) { code = colDataSetVal(pDst, rows, pkey->pData, false); QUERY_CHECK_CODE(code, lino, _end); } else { @@ -836,7 +841,7 @@ static void doTimesliceImpl(SOperatorInfo* pOperator, STimeSliceOperatorInfo* pS int64_t nextTs = *(int64_t*)colDataGetData(pTsCol, i + 1); if (nextTs > pSliceInfo->current) { while (pSliceInfo->current < nextTs && pSliceInfo->current <= pSliceInfo->win.ekey) { - if (!genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pResBlock, pBlock, i, false, pTaskInfo) && + if (!genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pResBlock, pBlock, i, false, pTaskInfo, false) && pSliceInfo->fillType == TSDB_FILL_LINEAR) { break; } else { @@ -864,7 +869,7 @@ static void doTimesliceImpl(SOperatorInfo* pOperator, STimeSliceOperatorInfo* pS doKeepLinearInfo(pSliceInfo, pBlock, i); while (pSliceInfo->current < ts && pSliceInfo->current <= pSliceInfo->win.ekey) { - if (!genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pResBlock, pBlock, i, true, pTaskInfo) && + if (!genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pResBlock, pBlock, i, true, pTaskInfo, false) && pSliceInfo->fillType == TSDB_FILL_LINEAR) { break; } else { @@ -909,13 +914,12 @@ static void genInterpAfterDataBlock(STimeSliceOperatorInfo* pSliceInfo, SOperato SSDataBlock* pResBlock = pSliceInfo->pRes; SInterval* pInterval = &pSliceInfo->interval; - if (pSliceInfo->fillType == TSDB_FILL_NEXT || pSliceInfo->fillType == TSDB_FILL_LINEAR || - pSliceInfo->pPrevGroupKey == NULL) { + if (pSliceInfo->pPrevGroupKey == NULL) { return; } while (pSliceInfo->current <= pSliceInfo->win.ekey) { - (void)genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pResBlock, NULL, index, false, pOperator->pTaskInfo); + (void)genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pResBlock, NULL, index, false, pOperator->pTaskInfo, true); pSliceInfo->current = taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision); } diff --git a/tests/system-test/2-query/interp.py b/tests/system-test/2-query/interp.py index bcfc389d7b..3cdf52725a 100644 --- a/tests/system-test/2-query/interp.py +++ b/tests/system-test/2-query/interp.py @@ -907,7 +907,7 @@ class TDTestCase: ## {. . .} tdSql.query(f"select interp(c0) from {dbname}.{tbname} range('2020-02-01 00:00:04', '2020-02-01 00:00:16') every(1s) fill(next)") - tdSql.checkRows(12) + tdSql.checkRows(13) tdSql.checkData(0, 0, 5) tdSql.checkData(1, 0, 5) tdSql.checkData(2, 0, 10) @@ -920,6 +920,7 @@ class TDTestCase: tdSql.checkData(9, 0, 15) tdSql.checkData(10, 0, 15) tdSql.checkData(11, 0, 15) + tdSql.checkData(12, 0, None) ## {} ... tdSql.query(f"select interp(c0) from {dbname}.{tbname} range('2020-02-01 00:00:01', '2020-02-01 00:00:04') every(1s) fill(next)") @@ -957,10 +958,12 @@ class TDTestCase: ## ..{.} tdSql.query(f"select interp(c0) from {dbname}.{tbname} range('2020-02-01 00:00:13', '2020-02-01 00:00:17') every(1s) fill(next)") - tdSql.checkRows(3) + tdSql.checkRows(5) tdSql.checkData(0, 0, 15) tdSql.checkData(1, 0, 15) tdSql.checkData(2, 0, 15) + tdSql.checkData(3, 0, None) + tdSql.checkData(4, 0, None) ## ... {} tdSql.query(f"select interp(c0) from {dbname}.{tbname} range('2020-02-01 00:00:16', '2020-02-01 00:00:19') every(1s) fill(next)") @@ -1272,7 +1275,7 @@ class TDTestCase: tdSql.checkData(8, 1, True) tdSql.query(f"select _irowts,_isfilled,interp(c0) from {dbname}.{tbname} range('2020-02-01 00:00:04', '2020-02-01 00:00:16') every(1s) fill(next)") - tdSql.checkRows(12) + tdSql.checkRows(13) tdSql.checkCols(3) tdSql.checkData(0, 0, '2020-02-01 00:00:04.000') @@ -1287,6 +1290,7 @@ class TDTestCase: tdSql.checkData(9, 0, '2020-02-01 00:00:13.000') tdSql.checkData(10, 0, '2020-02-01 00:00:14.000') tdSql.checkData(11, 0, '2020-02-01 00:00:15.000') + tdSql.checkData(12, 0, '2020-02-01 00:00:16.000') tdSql.checkData(0, 1, True) tdSql.checkData(1, 1, False) @@ -1300,6 +1304,7 @@ class TDTestCase: tdSql.checkData(9, 1, True) tdSql.checkData(10, 1, True) tdSql.checkData(11, 1, False) + tdSql.checkData(12, 1, True) tdSql.query(f"select _irowts,_isfilled,interp(c0) from {dbname}.{tbname} range('2020-02-01 00:00:05', '2020-02-01 00:00:15') every(2s) fill(next)") tdSql.checkRows(6) @@ -1677,9 +1682,13 @@ class TDTestCase: ## | . | { | .} | tdSql.query(f"select interp(c0) from {dbname}.{tbname} range('2020-02-10 00:00:05', '2020-02-15 00:00:05') every(1d) fill(next)") - tdSql.checkRows(2) + tdSql.checkRows(6) tdSql.checkData(0, 0, 15) tdSql.checkData(1, 0, 15) + tdSql.checkData(2, 0, None) + tdSql.checkData(3, 0, None) + tdSql.checkData(4, 0, None) + tdSql.checkData(5, 0, None) # test fill linear @@ -2732,7 +2741,7 @@ class TDTestCase: tdSql.checkData(4, i, 15) tdSql.query(f"select interp(c0),interp(c1),interp(c2),interp(c3) from {dbname}.{tbname} range('2020-02-09 00:00:05', '2020-02-13 00:00:05') every(1d) fill(next)") - tdSql.checkRows(3) + tdSql.checkRows(5) tdSql.checkCols(4) for i in range (tdSql.queryCols): @@ -2828,7 +2837,7 @@ class TDTestCase: # test fill next tdSql.query(f"select _irowts,_isfilled,interp(c0) from {dbname}.{tbname2} range('2020-02-02 00:00:00', '2020-02-02 00:00:18') every(1s) fill(next)") - tdSql.checkRows(18) + tdSql.checkRows(19) tdSql.checkCols(3) tdSql.checkData(0, 0, '2020-02-02 00:00:00.000') @@ -2851,6 +2860,7 @@ class TDTestCase: tdSql.checkData(15, 2, None) tdSql.checkData(16, 2, None) tdSql.checkData(17, 2, None) + tdSql.checkData(18, 2, None) tdSql.checkData(17, 0, '2020-02-02 00:00:17.000') @@ -3081,7 +3091,7 @@ class TDTestCase: # test fill linear tdSql.query(f"select _irowts,_isfilled,interp(c0) from {dbname}.{tbname2} range('2020-02-02 00:00:00', '2020-02-02 00:00:18') every(1s) fill(linear)") - tdSql.checkRows(17) + tdSql.checkRows(18) tdSql.checkCols(3) tdSql.checkData(0, 0, '2020-02-02 00:00:01.000') @@ -3103,8 +3113,9 @@ class TDTestCase: tdSql.checkData(14, 2, None) tdSql.checkData(15, 2, None) tdSql.checkData(16, 2, None) + tdSql.checkData(17, 2, None) - tdSql.checkData(16, 0, '2020-02-02 00:00:17.000') + tdSql.checkData(17, 0, '2020-02-02 00:00:18.000') tdLog.printNoPrefix("==========step13:test error cases") @@ -3220,7 +3231,7 @@ class TDTestCase: tdSql.checkData(17, 1, True) tdSql.query(f"select _irowts, _isfilled, interp(c0) from {dbname}.{stbname} range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(next)") - tdSql.checkRows(18) + tdSql.checkRows(19) tdSql.checkData(0, 0, '2020-02-01 00:00:00.000') tdSql.checkData(0, 1, True) @@ -3243,9 +3254,12 @@ class TDTestCase: tdSql.checkData(15, 2, 15) tdSql.checkData(16, 2, 17) tdSql.checkData(17, 2, 17) + tdSql.checkData(18, 2, None) tdSql.checkData(17, 0, '2020-02-01 00:00:17.000') tdSql.checkData(17, 1, False) + tdSql.checkData(18, 0, '2020-02-01 00:00:18.000') + tdSql.checkData(18, 1, True) tdSql.query(f"select _irowts, _isfilled, interp(c0) from {dbname}.{stbname} range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(linear)") tdSql.checkRows(17) @@ -3362,24 +3376,24 @@ class TDTestCase: tdSql.query(f"select tbname, _irowts, _isfilled, interp(c0) from {dbname}.{stbname} partition by tbname range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(next)") - tdSql.checkRows(48) - for i in range(0, 14): + tdSql.checkRows(57) + for i in range(0, 19): tdSql.checkData(i, 0, 'ctb1') - for i in range(14, 30): + for i in range(19, 38): tdSql.checkData(i, 0, 'ctb2') - for i in range(30, 48): + for i in range(38, 57): tdSql.checkData(i, 0, 'ctb3') tdSql.checkData(0, 1, '2020-02-01 00:00:00.000') - tdSql.checkData(13, 1, '2020-02-01 00:00:13.000') + tdSql.checkData(18, 1, '2020-02-01 00:00:18.000') - tdSql.checkData(14, 1, '2020-02-01 00:00:00.000') - tdSql.checkData(29, 1, '2020-02-01 00:00:15.000') + tdSql.checkData(19, 1, '2020-02-01 00:00:00.000') + tdSql.checkData(37, 1, '2020-02-01 00:00:18.000') - tdSql.checkData(30, 1, '2020-02-01 00:00:00.000') - tdSql.checkData(47, 1, '2020-02-01 00:00:17.000') + tdSql.checkData(38, 1, '2020-02-01 00:00:00.000') + tdSql.checkData(56, 1, '2020-02-01 00:00:18.000') for i in range(0, 2): tdSql.checkData(i, 3, 1) @@ -3390,24 +3404,33 @@ class TDTestCase: for i in range(8, 14): tdSql.checkData(i, 3, 13) - for i in range(14, 18): + for i in range(14, 19): + tdSql.checkData(i, 3, None) + + for i in range(19, 23): tdSql.checkData(i, 3, 3) - for i in range(18, 24): + for i in range(23, 29): tdSql.checkData(i, 3, 9) - for i in range(24, 30): + for i in range(29, 35): tdSql.checkData(i, 3, 15) - for i in range(30, 36): + for i in range(35, 38): + tdSql.checkData(i, 3, None) + + for i in range(38, 44): tdSql.checkData(i, 3, 5) - for i in range(36, 42): + for i in range(44, 50): tdSql.checkData(i, 3, 11) - for i in range(42, 48): + for i in range(50, 56): tdSql.checkData(i, 3, 17) + for i in range(56, 57): + tdSql.checkData(i, 3, None) + tdSql.query(f"select tbname, _irowts, _isfilled, interp(c0) from {dbname}.{stbname} partition by tbname range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(linear)") tdSql.checkRows(39) @@ -3450,7 +3473,7 @@ class TDTestCase: tdSql.checkRows(90) tdSql.query(f"select c0, _irowts, _isfilled, interp(c0) from {dbname}.{stbname} partition by c0 range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(next)") - tdSql.checkRows(90) + tdSql.checkRows(171) tdSql.query(f"select c0, _irowts, _isfilled, interp(c0) from {dbname}.{stbname} partition by c0 range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(linear)") tdSql.checkRows(9) @@ -3467,7 +3490,7 @@ class TDTestCase: tdSql.checkRows(48) tdSql.query(f"select t1, _irowts, _isfilled, interp(c0) from {dbname}.{stbname} partition by t1 range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(next)") - tdSql.checkRows(48) + tdSql.checkRows(57) tdSql.query(f"select t1, _irowts, _isfilled, interp(c0) from {dbname}.{stbname} partition by t1 range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(linear)") tdSql.checkRows(39) @@ -4363,7 +4386,7 @@ class TDTestCase: tdSql.query(f"select _irowts, _isfilled, interp(c0, 1) from {dbname}.{tbname_null} range('2020-02-02 00:00:01', '2020-02-02 00:00:11') every(1s) fill(next)") - tdSql.checkRows(9) + tdSql.checkRows(11) tdSql.checkData(0, 1, False) tdSql.checkData(1, 1, True) tdSql.checkData(2, 1, False) @@ -4373,6 +4396,8 @@ class TDTestCase: tdSql.checkData(6, 1, True) tdSql.checkData(7, 1, False) tdSql.checkData(8, 1, False) + tdSql.checkData(9, 1, True) + tdSql.checkData(10, 1, True) tdSql.checkData(0, 2, 1) tdSql.checkData(1, 2, 3) @@ -4383,11 +4408,13 @@ class TDTestCase: tdSql.checkData(6, 2, 8) tdSql.checkData(7, 2, 8) tdSql.checkData(8, 2, 9) + tdSql.checkData(9, 2, None) + tdSql.checkData(10, 2, None) tdSql.query(f"select _irowts, _isfilled, interp(c0) from {dbname}.{tbname_null} where c0 is not null range('2020-02-02 00:00:01', '2020-02-02 00:00:11') every(1s) fill(next)") - tdSql.checkRows(9) + tdSql.checkRows(11) tdSql.checkData(0, 1, False) tdSql.checkData(1, 1, True) tdSql.checkData(2, 1, False) @@ -4397,6 +4424,9 @@ class TDTestCase: tdSql.checkData(6, 1, True) tdSql.checkData(7, 1, False) tdSql.checkData(8, 1, False) + tdSql.checkData(9, 1, True) + tdSql.checkData(10, 1, True) + tdSql.checkData(0, 2, 1) tdSql.checkData(1, 2, 3) @@ -4407,6 +4437,8 @@ class TDTestCase: tdSql.checkData(6, 2, 8) tdSql.checkData(7, 2, 8) tdSql.checkData(8, 2, 9) + tdSql.checkData(9, 2, None) + tdSql.checkData(10, 2, None) # super table tdSql.query(f"select _irowts, _isfilled, interp(c0) from {dbname}.{stbname_null} range('2020-02-01 00:00:01', '2020-02-01 00:00:17') every(2s) fill(next)") @@ -4443,7 +4475,7 @@ class TDTestCase: tdSql.query(f"select _irowts, _isfilled, interp(c0, 1) from {dbname}.{stbname_null} range('2020-02-01 00:00:01', '2020-02-01 00:00:17') every(2s) fill(next)") - tdSql.checkRows(8) + tdSql.checkRows(9) tdSql.checkData(0, 1, False) tdSql.checkData(1, 1, True) tdSql.checkData(2, 1, True) @@ -4452,6 +4484,7 @@ class TDTestCase: tdSql.checkData(5, 1, True) tdSql.checkData(6, 1, False) tdSql.checkData(7, 1, False) + tdSql.checkData(8, 1, True) tdSql.checkData(0, 2, 1) tdSql.checkData(1, 2, 9) @@ -4461,11 +4494,12 @@ class TDTestCase: tdSql.checkData(5, 2, 13) tdSql.checkData(6, 2, 13) tdSql.checkData(7, 2, 15) + tdSql.checkData(8, 2, None) tdSql.query(f"select _irowts, _isfilled, interp(c0) from {dbname}.{stbname_null} where c0 is not null range('2020-02-01 00:00:01', '2020-02-01 00:00:17') every(2s) fill(next)") - tdSql.checkRows(8) + tdSql.checkRows(9) tdSql.checkData(0, 1, False) tdSql.checkData(1, 1, True) tdSql.checkData(2, 1, True) @@ -4474,6 +4508,7 @@ class TDTestCase: tdSql.checkData(5, 1, True) tdSql.checkData(6, 1, False) tdSql.checkData(7, 1, False) + tdSql.checkData(8, 1, True) tdSql.checkData(0, 2, 1) tdSql.checkData(1, 2, 9) @@ -4483,36 +4518,37 @@ class TDTestCase: tdSql.checkData(5, 2, 13) tdSql.checkData(6, 2, 13) tdSql.checkData(7, 2, 15) + tdSql.checkData(8, 2, None) tdSql.query(f"select tbname, _irowts, _isfilled, interp(c0, 1) from {dbname}.{stbname_null} partition by tbname range('2020-02-01 00:00:01', '2020-02-01 00:00:17') every(2s) fill(next)") - tdSql.checkRows(15) - for i in range(0, 7): + tdSql.checkRows(18) + for i in range(0, 9): tdSql.checkData(i, 0, 'ctb1_null') - for i in range(7, 15): + for i in range(9, 18): tdSql.checkData(i, 0, 'ctb2_null') tdSql.checkData(0, 1, '2020-02-01 00:00:01.000') - tdSql.checkData(6, 1, '2020-02-01 00:00:13.000') + tdSql.checkData(8, 1, '2020-02-01 00:00:17.000') - tdSql.checkData(7, 1, '2020-02-01 00:00:01.000') - tdSql.checkData(14, 1, '2020-02-01 00:00:15.000') + tdSql.checkData(9, 1, '2020-02-01 00:00:01.000') + tdSql.checkData(17, 1, '2020-02-01 00:00:17.000') tdSql.query(f"select tbname, _irowts, _isfilled, interp(c0) from {dbname}.{stbname_null} where c0 is not null partition by tbname range('2020-02-01 00:00:01', '2020-02-01 00:00:17') every(2s) fill(next)") - tdSql.checkRows(15) - for i in range(0, 7): + tdSql.checkRows(18) + for i in range(0, 9): tdSql.checkData(i, 0, 'ctb1_null') - for i in range(7, 15): + for i in range(9, 18): tdSql.checkData(i, 0, 'ctb2_null') tdSql.checkData(0, 1, '2020-02-01 00:00:01.000') - tdSql.checkData(6, 1, '2020-02-01 00:00:13.000') + tdSql.checkData(8, 1, '2020-02-01 00:00:17.000') - tdSql.checkData(7, 1, '2020-02-01 00:00:01.000') - tdSql.checkData(14, 1, '2020-02-01 00:00:15.000') + tdSql.checkData(9, 1, '2020-02-01 00:00:01.000') + tdSql.checkData(17, 1, '2020-02-01 00:00:17.000') # fill linear # normal table From 420482720fe105004df2d3bb60abdee73e24beb5 Mon Sep 17 00:00:00 2001 From: wangjiaming0909 <604227650@qq.com> Date: Fri, 27 Sep 2024 09:09:45 +0800 Subject: [PATCH 02/20] ret check for taosMemoryAlloc --- source/libs/parser/src/parAstCreater.c | 12 ++++++------ source/libs/parser/src/parAstParser.c | 1 + 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/source/libs/parser/src/parAstCreater.c b/source/libs/parser/src/parAstCreater.c index e52c8865c7..8904679072 100644 --- a/source/libs/parser/src/parAstCreater.c +++ b/source/libs/parser/src/parAstCreater.c @@ -28,12 +28,12 @@ } \ } while (0) -#define CHECK_OUT_OF_MEM(p) \ - do { \ - if (NULL == (p)) { \ - pCxt->errCode = TSDB_CODE_OUT_OF_MEMORY; \ - goto _err; \ - } \ +#define CHECK_OUT_OF_MEM(p) \ + do { \ + if (NULL == (p)) { \ + pCxt->errCode = terrno; \ + goto _err; \ + } \ } while (0) #define CHECK_PARSER_STATUS(pCxt) \ diff --git a/source/libs/parser/src/parAstParser.c b/source/libs/parser/src/parAstParser.c index f4dd91f392..10d9b19e7f 100644 --- a/source/libs/parser/src/parAstParser.c +++ b/source/libs/parser/src/parAstParser.c @@ -47,6 +47,7 @@ int32_t parse(SParseContext* pParseCxt, SQuery** pQuery) { SAstCreateContext cxt; initAstCreateContext(pParseCxt, &cxt); void* pParser = ParseAlloc((FMalloc)taosMemoryMalloc); + if (!pParser) return terrno; int32_t i = 0; while (1) { SToken t0 = {0}; From 75e4c5ea816d09248a7163e2f3ec80f7aacdfe24 Mon Sep 17 00:00:00 2001 From: wangjiaming0909 <604227650@qq.com> Date: Thu, 26 Sep 2024 19:43:16 +0800 Subject: [PATCH 03/20] fix dbcache obj use after free --- source/libs/catalog/src/ctgCache.c | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/source/libs/catalog/src/ctgCache.c b/source/libs/catalog/src/ctgCache.c index eafd85a504..f95c76c1cb 100644 --- a/source/libs/catalog/src/ctgCache.c +++ b/source/libs/catalog/src/ctgCache.c @@ -1583,7 +1583,7 @@ int32_t ctgDropTSMAForTbEnqueue(SCatalog *pCtg, SName *pName, bool syncOp) { SCtgTSMACache *pCtgCache = NULL; (void)tNameGetFullDbName(pName, dbFName); - CTG_ERR_JRET(ctgGetDBCache(pCtg, dbFName, &pDbCache)); + CTG_ERR_JRET(ctgAcquireDBCache(pCtg, dbFName, &pDbCache)); if (NULL == pDbCache || !pDbCache->tsmaCache) { goto _return; } @@ -1613,6 +1613,7 @@ int32_t ctgDropTSMAForTbEnqueue(SCatalog *pCtg, SName *pName, bool syncOp) { CTG_ERR_JRET(ctgEnqueue(pCtg, pOp)); taosHashRelease(pDbCache->tsmaCache, pCtgCache); + ctgReleaseDBCache(pCtg, pDbCache); return TSDB_CODE_SUCCESS; @@ -1621,6 +1622,9 @@ _return: if (pCtgCache) { taosHashRelease(pDbCache->tsmaCache, pCtgCache); } + if (pDbCache) { + ctgReleaseDBCache(pCtg, pDbCache); + } if (pOp) { taosMemoryFree(pOp->data); taosMemoryFree(pOp); @@ -3996,17 +4000,20 @@ int32_t ctgGetTbTSMAFromCache(SCatalog* pCtg, SCtgTbTSMACtx* pCtx, int32_t dbIdx if (pCache->retryFetch || hasOutOfDateTSMACache(pCache->pTsmas)) { CTG_UNLOCK(CTG_READ, &pCache->tsmaLock); - taosHashRelease(dbCache->tsmaCache, pCache); ctgDebug("tsma for tb: %s.%s not in cache", tsmaSourceTbName.tname, dbFName); CTG_ERR_JRET(ctgAddTSMAFetch(&pCtx->pFetches, dbIdx, i, fetchIdx, baseResIdx + i, flag, FETCH_TB_TSMA, &tsmaSourceTbName)); if (NULL == taosArrayPush(pCtx->pResList, &(SMetaRes){0})) { + taosHashRelease(dbCache->tsmaCache, pCache); CTG_ERR_JRET(terrno); } CTG_CACHE_NHIT_INC(CTG_CI_TBL_TSMA, 1); + CTG_LOCK(CTG_WRITE, &pCache->tsmaLock); pCache->retryFetch = false; + CTG_UNLOCK(CTG_WRITE, &pCache->tsmaLock); + taosHashRelease(dbCache->tsmaCache, pCache); continue; } From 64cb7a1d5ebb5a645c2c00271b8b8ef1ae3b0a32 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 27 Sep 2024 09:27:58 +0800 Subject: [PATCH 04/20] fix(query): fix memory leak. --- source/libs/executor/src/sysscanoperator.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/source/libs/executor/src/sysscanoperator.c b/source/libs/executor/src/sysscanoperator.c index dcebdf59a9..a210c147a2 100644 --- a/source/libs/executor/src/sysscanoperator.c +++ b/source/libs/executor/src/sysscanoperator.c @@ -2846,6 +2846,8 @@ static int32_t initTableblockDistQueryCond(uint64_t uid, SQueryTableDataCond* pC pCond->colList = taosMemoryCalloc(1, sizeof(SColumnInfo)); pCond->pSlotList = taosMemoryMalloc(sizeof(int32_t)); if (pCond->colList == NULL || pCond->pSlotList == NULL) { + taosMemoryFree(pCond->colList); + taosMemoryFree(pCond->pSlotList); return terrno; } From b1942889d8468df41e9df7e61dd208a93279d177 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 27 Sep 2024 09:34:46 +0800 Subject: [PATCH 05/20] refactor: remove unused code. --- source/libs/executor/src/sysscanoperator.c | 9 --------- 1 file changed, 9 deletions(-) diff --git a/source/libs/executor/src/sysscanoperator.c b/source/libs/executor/src/sysscanoperator.c index a210c147a2..fec7c3bc6c 100644 --- a/source/libs/executor/src/sysscanoperator.c +++ b/source/libs/executor/src/sysscanoperator.c @@ -131,9 +131,6 @@ const SSTabFltFuncDef filterDict[] = { static int32_t buildDbTableInfoBlock(bool sysInfo, const SSDataBlock* p, const SSysTableMeta* pSysDbTableMeta, size_t size, const char* dbName, int64_t* pRows); -static char* SYSTABLE_IDX_COLUMN[] = {"table_name", "db_name", "create_time", "columns", - "ttl", "stable_name", "vgroup_id', 'uid", "type"}; - static char* SYSTABLE_SPECIAL_COL[] = {"db_name", "vgroup_id"}; static int32_t buildSysDbTableInfo(const SSysTableScanInfo* pInfo, int32_t capacity); @@ -2822,12 +2819,6 @@ _end: return code; } -static SSDataBlock* doBlockInfoScan(SOperatorInfo* pOperator) { - SSDataBlock* pRes = NULL; - int32_t code = doBlockInfoScanNext(pOperator, &pRes); - return pRes; -} - static void destroyBlockDistScanOperatorInfo(void* param) { SBlockDistInfo* pDistInfo = (SBlockDistInfo*)param; blockDataDestroy(pDistInfo->pResBlock); From 3214157694be09bdf3a22e6e423aad377677a7f2 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 27 Sep 2024 11:17:03 +0800 Subject: [PATCH 06/20] valid iter --- source/libs/stream/src/streamBackendRocksdb.c | 9 +++++++++ source/libs/stream/src/streamState.c | 16 ++++++++-------- 2 files changed, 17 insertions(+), 8 deletions(-) diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 7a8be34781..a2c9012df5 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -3657,6 +3657,10 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentPrev_rocksdb(SStreamState* pSta pCur->db = wrapper->db; pCur->iter = streamStateIterCreate(pState, "sess", (rocksdb_snapshot_t**)&pCur->snapshot, (rocksdb_readoptions_t**)&pCur->readOpt); + if (pCur->iter == NULL) { + streamStateFreeCur(pCur); + return NULL; + } char buf[128] = {0}; SStateSessionKey sKey = {.key = *key, .opNum = pState->number}; @@ -3679,6 +3683,11 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentPrev_rocksdb(SStreamState* pSta TAOS_UNUSED(stateSessionKeyDecode(&curKey, (char*)iKey)); if (stateSessionKeyCmpr(&sKey, sizeof(sKey), &curKey, sizeof(curKey)) >= 0) return pCur; + if (!rocksdb_iter_valid(pCur->iter)) { + streamStateFreeCur(pCur); + return NULL; + } + rocksdb_iter_prev(pCur->iter); if (!rocksdb_iter_valid(pCur->iter)) { streamStateFreeCur(pCur); diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index 41ff8f3c24..1801c6e029 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -99,8 +99,8 @@ int stateKeyCmpr(const void* pKey1, int kLen1, const void* pKey2, int kLen2) { } SStreamState* streamStateOpen(const char* path, void* pTask, int64_t streamId, int32_t taskId) { - int32_t code = TSDB_CODE_SUCCESS; - int32_t lino = 0; + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; SStreamState* pState = taosMemoryCalloc(1, sizeof(SStreamState)); stDebug("open stream state %p, %s", pState, path); @@ -170,12 +170,12 @@ int32_t streamStateFuncPut(SStreamState* pState, const SWinKey* key, const void* int32_t lino = 0; void* pVal = NULL; int32_t len = getRowStateRowSize(pState->pFileState); - int32_t tmpLen = len; + int32_t tmpLen = len; code = getFunctionRowBuff(pState->pFileState, (void*)key, sizeof(SWinKey), &pVal, &tmpLen); QUERY_CHECK_CODE(code, lino, _end); - char* buf = ((SRowBuffPos*)pVal)->pRowBuff; - int32_t rowSize = streamFileStateGetSelectRowSize(pState->pFileState); + char* buf = ((SRowBuffPos*)pVal)->pRowBuff; + int32_t rowSize = streamFileStateGetSelectRowSize(pState->pFileState); memcpy(buf + len - rowSize, value, vLen); _end: @@ -189,12 +189,12 @@ int32_t streamStateFuncGet(SStreamState* pState, const SWinKey* key, void** ppVa int32_t lino = 0; void* pVal = NULL; int32_t len = getRowStateRowSize(pState->pFileState); - int32_t tmpLen = len; + int32_t tmpLen = len; code = getFunctionRowBuff(pState->pFileState, (void*)key, sizeof(SWinKey), (void**)(&pVal), &tmpLen); QUERY_CHECK_CODE(code, lino, _end); - char* buf = ((SRowBuffPos*)pVal)->pRowBuff; - int32_t rowSize = streamFileStateGetSelectRowSize(pState->pFileState); + char* buf = ((SRowBuffPos*)pVal)->pRowBuff; + int32_t rowSize = streamFileStateGetSelectRowSize(pState->pFileState); *ppVal = buf + len - rowSize; streamStateReleaseBuf(pState, pVal, false); From 9a7fec10afc3c4d835a6c6f288e446a5deaef85c Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Fri, 27 Sep 2024 13:32:28 +0800 Subject: [PATCH 07/20] fix: task reschedule issue --- source/libs/scheduler/src/schTask.c | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/source/libs/scheduler/src/schTask.c b/source/libs/scheduler/src/schTask.c index a0275d9c21..3a63889271 100644 --- a/source/libs/scheduler/src/schTask.c +++ b/source/libs/scheduler/src/schTask.c @@ -358,6 +358,11 @@ int32_t schRescheduleTask(SSchJob *pJob, SSchTask *pTask) { int32_t schChkUpdateRedirectCtx(SSchJob *pJob, SSchTask *pTask, SEpSet *pEpSet, int32_t rspCode) { SSchRedirectCtx *pCtx = &pTask->redirectCtx; + if (JOB_TASK_STATUS_EXEC == pTask->status) { + SCH_TASK_DLOG("task not start yet, rspCode:%d", rspCode); + return TSDB_CODE_SUCCESS; + } + if (!pCtx->inRedirect) { pCtx->inRedirect = true; pCtx->periodMs = tsRedirectPeriod; From 03c33e77aa71efc1c7d287d8f6fc707dc24175f1 Mon Sep 17 00:00:00 2001 From: Jinqing Kuang Date: Fri, 27 Sep 2024 13:34:50 +0800 Subject: [PATCH 08/20] fix(query)[TD-32353]. Fix error handling during tag scan Correct the row count in the result data block when the tag scan tolerates partial errors. This prevents upper-layer operators from accessing invalid memory address. --- source/libs/executor/src/scanoperator.c | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index a2e2dc9b98..c13535b7f6 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -4294,13 +4294,13 @@ _error: return code; } -static int32_t doTagScanOneTable(SOperatorInfo* pOperator, const SSDataBlock* pRes, int32_t count, SMetaReader* mr, - SStorageAPI* pAPI) { +static int32_t doTagScanOneTable(SOperatorInfo* pOperator, SSDataBlock* pRes, SMetaReader* mr, SStorageAPI* pAPI) { int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; STagScanInfo* pInfo = pOperator->info; SExprInfo* pExprInfo = &pOperator->exprSupp.pExprInfo[0]; + int32_t count = pRes->info.rows; STableKeyInfo* item = tableListGetInfo(pInfo->pTableListInfo, pInfo->curPos); if (!item) { @@ -4360,6 +4360,8 @@ _end: if (code != TSDB_CODE_SUCCESS) { qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); pTaskInfo->code = code; + } else { + pRes->info.rows++; } return code; @@ -4715,26 +4717,23 @@ static int32_t doTagScanFromMetaEntryNext(SOperatorInfo* pOperator, SSDataBlock* return code; } - int32_t count = 0; SMetaReader mr = {0}; pAPI->metaReaderFn.initReader(&mr, pInfo->readHandle.vnode, META_READER_LOCK, &pAPI->metaFn); + pRes->info.rows = 0; - while (pInfo->curPos < size && count < pOperator->resultInfo.capacity) { - code = doTagScanOneTable(pOperator, pRes, count, &mr, &pTaskInfo->storageAPI); + while (pInfo->curPos < size && pRes->info.rows < pOperator->resultInfo.capacity) { + code = doTagScanOneTable(pOperator, pRes, &mr, &pTaskInfo->storageAPI); if (code != TSDB_CODE_OUT_OF_MEMORY) { // ignore other error code = TSDB_CODE_SUCCESS; } QUERY_CHECK_CODE(code, lino, _end); - ++count; if (++pInfo->curPos >= size) { setOperatorCompleted(pOperator); } } - pRes->info.rows = count; - pAPI->metaReaderFn.clearReader(&mr); bool bLimitReached = applyLimitOffset(&pInfo->limitInfo, pRes, pTaskInfo); if (bLimitReached) { From 6370e5b7828b2a5552a2f586ea39cc6638fbb4b8 Mon Sep 17 00:00:00 2001 From: xiao-77 Date: Fri, 27 Sep 2024 14:10:05 +0800 Subject: [PATCH 09/20] fix mem leak at walFindCurMetaVer --- source/libs/wal/src/walMeta.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/source/libs/wal/src/walMeta.c b/source/libs/wal/src/walMeta.c index 7ea98d648d..9943fd1701 100644 --- a/source/libs/wal/src/walMeta.c +++ b/source/libs/wal/src/walMeta.c @@ -937,6 +937,7 @@ static int walFindCurMetaVer(SWal* pWal) { TdDirPtr pDir = taosOpenDir(pWal->path); if (pDir == NULL) { wError("vgId:%d, path:%s, failed to open since %s", pWal->cfg.vgId, pWal->path, tstrerror(terrno)); + regfree(&walMetaRegexPattern); return terrno; } @@ -956,6 +957,7 @@ static int walFindCurMetaVer(SWal* pWal) { } if (taosCloseDir(&pDir) != 0) { wError("failed to close dir, ret:%s", tstrerror(terrno)); + regfree(&walMetaRegexPattern); return terrno; } regfree(&walMetaRegexPattern); From c52da5ab33624361552f2946613fd18bd1b3316c Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao@163.com> Date: Fri, 27 Sep 2024 14:21:51 +0800 Subject: [PATCH 10/20] adjust log level --- source/libs/executor/src/scanoperator.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index a2e2dc9b98..4a99497d07 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -890,7 +890,7 @@ void markGroupProcessed(STableScanInfo* pInfo, uint64_t groupId) { } else { int32_t code = taosHashRemove(pInfo->base.pTableListInfo->remainGroups, &groupId, sizeof(groupId)); if (code != TSDB_CODE_SUCCESS) { - qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code)); + qDebug("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code)); } } } From 54ea1466fcdc7fa2517c73d042ec76ecf2b3192a Mon Sep 17 00:00:00 2001 From: xsren <285808407@qq.com> Date: Fri, 27 Sep 2024 15:24:19 +0800 Subject: [PATCH 11/20] fix: cpy mem overflow --- source/dnode/mnode/impl/src/mndStreamTrans.c | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/source/dnode/mnode/impl/src/mndStreamTrans.c b/source/dnode/mnode/impl/src/mndStreamTrans.c index 7171d44da4..c08478b359 100644 --- a/source/dnode/mnode/impl/src/mndStreamTrans.c +++ b/source/dnode/mnode/impl/src/mndStreamTrans.c @@ -324,7 +324,9 @@ void killAllCheckpointTrans(SMnode *pMnode, SVgroupChangeInfo *pChangeInfo) { size_t len = 0; void *pKey = taosHashGetKey(pDb, &len); - tstrncpy(p, pKey, 128); + int cpLen = MIN(127, len); + TAOS_STRNCPY(p, pKey, cpLen); + p[cpLen] = '\0'; int32_t code = doKillCheckpointTrans(pMnode, pKey, len); if (code) { From 3d54d9b8a3c1851839406ef346aefabebb3776dc Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 27 Sep 2024 15:34:17 +0800 Subject: [PATCH 12/20] fix double send resp --- source/dnode/mnode/impl/src/mndStb.c | 48 +++++++++++++++------------- 1 file changed, 25 insertions(+), 23 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index 56461e9cfd..0e4fda1510 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -1012,10 +1012,10 @@ _OVER: int32_t mndAddStbToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) { mndTransSetDbName(pTrans, pDb->name, pStb->name); - TAOS_CHECK_RETURN (mndTransCheckConflict(pMnode, pTrans)); - TAOS_CHECK_RETURN (mndSetCreateStbCommitLogs(pMnode, pTrans, pDb, pStb)); - TAOS_CHECK_RETURN (mndSetCreateStbRedoActions(pMnode, pTrans, pDb, pStb)); - TAOS_CHECK_RETURN (mndSetCreateStbUndoActions(pMnode, pTrans, pDb, pStb)); + TAOS_CHECK_RETURN(mndTransCheckConflict(pMnode, pTrans)); + TAOS_CHECK_RETURN(mndSetCreateStbCommitLogs(pMnode, pTrans, pDb, pStb)); + TAOS_CHECK_RETURN(mndSetCreateStbRedoActions(pMnode, pTrans, pDb, pStb)); + TAOS_CHECK_RETURN(mndSetCreateStbUndoActions(pMnode, pTrans, pDb, pStb)); return 0; } @@ -1051,7 +1051,7 @@ static int32_t mndProcessTtlTimer(SRpcMsg *pReq) { SRpcMsg rpcMsg = { .msgType = TDMT_VND_FETCH_TTL_EXPIRED_TBS, .pCont = pHead, .contLen = contLen, .info = pReq->info}; - SEpSet epSet = mndGetVgroupEpset(pMnode, pVgroup); + SEpSet epSet = mndGetVgroupEpset(pMnode, pVgroup); code = tmsgSendReq(&epSet, &rpcMsg); if (code != 0) { mError("vgId:%d, failed to send drop ttl table request to vnode since 0x%x", pVgroup->vgId, code); @@ -1500,8 +1500,8 @@ static int32_t mndAddSuperTableTag(const SStbObj *pOld, SStbObj *pNew, SArray *p static int32_t mndCheckAlterColForTopic(SMnode *pMnode, const char *stbFullName, int64_t suid, col_id_t colId) { int32_t code = 0; - SSdb *pSdb = pMnode->pSdb; - void *pIter = NULL; + SSdb *pSdb = pMnode->pSdb; + void *pIter = NULL; while (1) { SMqTopicObj *pTopic = NULL; pIter = sdbFetch(pSdb, SDB_TOPIC, pIter, (void **)&pTopic); @@ -1562,8 +1562,8 @@ static int32_t mndCheckAlterColForTopic(SMnode *pMnode, const char *stbFullName, static int32_t mndCheckAlterColForStream(SMnode *pMnode, const char *stbFullName, int64_t suid, col_id_t colId) { int32_t code = 0; - SSdb *pSdb = pMnode->pSdb; - void *pIter = NULL; + SSdb *pSdb = pMnode->pSdb; + void *pIter = NULL; while (1) { SStreamObj *pStream = NULL; pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream); @@ -1616,8 +1616,8 @@ static int32_t mndCheckAlterColForStream(SMnode *pMnode, const char *stbFullName static int32_t mndCheckAlterColForTSma(SMnode *pMnode, const char *stbFullName, int64_t suid, col_id_t colId) { int32_t code = 0; - SSdb *pSdb = pMnode->pSdb; - void *pIter = NULL; + SSdb *pSdb = pMnode->pSdb; + void *pIter = NULL; while (1) { SSmaObj *pSma = NULL; pIter = sdbFetch(pSdb, SDB_SMA, pIter, (void **)&pSma); @@ -2233,7 +2233,7 @@ static int32_t mndBuildStbCfgImp(SDbObj *pDb, SStbObj *pStb, const char *tbName, static int32_t mndValidateStbVersion(SMnode *pMnode, SSTableVersion *pStbVer, bool *schema, bool *sma) { int32_t code = 0; - char tbFName[TSDB_TABLE_FNAME_LEN] = {0}; + char tbFName[TSDB_TABLE_FNAME_LEN] = {0}; snprintf(tbFName, sizeof(tbFName), "%s.%s", pStbVer->dbFName, pStbVer->stbName); SDbObj *pDb = mndAcquireDb(pMnode, pStbVer->dbFName); @@ -2278,7 +2278,7 @@ static int32_t mndValidateStbVersion(SMnode *pMnode, SSTableVersion *pStbVer, bo static int32_t mndBuildStbSchema(SMnode *pMnode, const char *dbFName, const char *tbName, STableMetaRsp *pRsp) { int32_t code = 0; - char tbFName[TSDB_TABLE_FNAME_LEN] = {0}; + char tbFName[TSDB_TABLE_FNAME_LEN] = {0}; snprintf(tbFName, sizeof(tbFName), "%s.%s", dbFName, tbName); SDbObj *pDb = mndAcquireDb(pMnode, dbFName); @@ -2302,7 +2302,7 @@ static int32_t mndBuildStbSchema(SMnode *pMnode, const char *dbFName, const char static int32_t mndBuildStbCfg(SMnode *pMnode, const char *dbFName, const char *tbName, STableCfgRsp *pRsp) { int32_t code = 0; - char tbFName[TSDB_TABLE_FNAME_LEN] = {0}; + char tbFName[TSDB_TABLE_FNAME_LEN] = {0}; snprintf(tbFName, sizeof(tbFName), "%s.%s", dbFName, tbName); SDbObj *pDb = mndAcquireDb(pMnode, dbFName); @@ -2656,7 +2656,7 @@ static int32_t mndProcessAlterStbReq(SRpcMsg *pReq) { code = mndAlterStb(pMnode, pReq, &alterReq, pDb, pStb); if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS; - SName name = {0}; + SName name = {0}; int32_t ret = 0; if ((ret = tNameFromString(&name, alterReq.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE)) != 0) mError("stb:%s, failed to tNameFromString since %s", alterReq.name, tstrerror(ret)); @@ -2779,8 +2779,8 @@ _OVER: static int32_t mndCheckDropStbForTopic(SMnode *pMnode, const char *stbFullName, int64_t suid) { int32_t code = 0; - SSdb *pSdb = pMnode->pSdb; - void *pIter = NULL; + SSdb *pSdb = pMnode->pSdb; + void *pIter = NULL; while (1) { SMqTopicObj *pTopic = NULL; pIter = sdbFetch(pSdb, SDB_TOPIC, pIter, (void **)&pTopic); @@ -2839,8 +2839,8 @@ static int32_t mndCheckDropStbForTopic(SMnode *pMnode, const char *stbFullName, static int32_t mndCheckDropStbForStream(SMnode *pMnode, const char *stbFullName, int64_t suid) { int32_t code = 0; - SSdb *pSdb = pMnode->pSdb; - void *pIter = NULL; + SSdb *pSdb = pMnode->pSdb; + void *pIter = NULL; while (1) { SStreamObj *pStream = NULL; pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream); @@ -2945,7 +2945,7 @@ static int32_t mndProcessDropStbReq(SRpcMsg *pReq) { code = mndDropStb(pMnode, pReq, pDb, pStb); if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS; - SName name = {0}; + SName name = {0}; int32_t ret = 0; if ((ret = tNameFromString(&name, dropReq.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE)) != 0) mError("stb:%s, failed to tNameFromString since %s", dropReq.name, tstrerror(ret)); @@ -3016,7 +3016,7 @@ _OVER: mndReleaseUser(pMnode, pUser); tFreeSTableMetaRsp(&metaRsp); - //TODO change to TAOS_RETURN + // TODO change to TAOS_RETURN return code; } @@ -3562,7 +3562,7 @@ static int32_t mndRetrieveStb(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBloc SName name = {0}; - char stbName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0}; + char stbName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0}; mndExtractTbNameFromStbFullName(pStb->name, &stbName[VARSTR_HEADER_SIZE], TSDB_TABLE_NAME_LEN); varDataSetLen(stbName, strlen(&stbName[VARSTR_HEADER_SIZE])); SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); @@ -4259,7 +4259,9 @@ static int32_t mndProcessDropTbWithTsma(SRpcMsg *pReq) { code = mndDropTbAddTsmaResTbsForSingleVg(pMnode, pCtx, pReq->pTbs, pReq->vgInfo.vgId); if (code) goto _OVER; } - if (mndCreateDropTbsTxnPrepare(pReq, pCtx) == 0) code = 0; + if (mndCreateDropTbsTxnPrepare(pReq, pCtx) == 0) { + code = TSDB_CODE_ACTION_IN_PROGRESS; + } _OVER: tFreeSMDropTbsReq(&dropReq); if (pCtx) mndDestroyDropTbsWithTsmaCtx(pCtx); From d7907da75d6174602645e74329d0bef606c0f4e5 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 27 Sep 2024 15:46:09 +0800 Subject: [PATCH 13/20] fix double send resp --- source/dnode/mnode/impl/src/mndStb.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index 0e4fda1510..b8cf72cd9e 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -4460,7 +4460,7 @@ static int32_t mndProcessFetchTtlExpiredTbs(SRpcMsg *pRsp) { code = mndDropTbAddTsmaResTbsForSingleVg(pMnode, pCtx, rsp.pExpiredTbs, rsp.vgId); if (code) goto _end; - if (mndCreateDropTbsTxnPrepare(pRsp, pCtx) == 0) code = 0; + if (mndCreateDropTbsTxnPrepare(pRsp, pCtx) == 0) code = TSDB_CODE_ACTION_IN_PROGRESS; _end: if (pCtx) mndDestroyDropTbsWithTsmaCtx(pCtx); tDecoderClear(&decoder); From f6f979e6ea31c1308d4202dcbf87d683b3ceeac6 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 27 Sep 2024 16:00:52 +0800 Subject: [PATCH 14/20] fix(stream): fix memory leaks. --- source/libs/stream/src/streamExec.c | 93 ++++++++++++++++------------- 1 file changed, 51 insertions(+), 42 deletions(-) diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 88e40b247b..e4dc0b5854 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -24,7 +24,8 @@ #define FILL_HISTORY_TASK_EXEC_INTERVAL 5000 // 5 sec static int32_t streamTransferStateDoPrepare(SStreamTask* pTask); -static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* totalSize, int32_t* totalBlocks); +static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* totalSize, + int32_t* totalBlocks); bool streamTaskShouldStop(const SStreamTask* pTask) { SStreamTaskState pState = streamTaskGetStatus(pTask); @@ -95,17 +96,50 @@ static int32_t doDumpResult(SStreamTask* pTask, SStreamQueueItem* pItem, SArray* return code; } +static int32_t doAppendPullOverBlock(SStreamTask* pTask, int32_t* pNumOfBlocks, SStreamDataBlock* pRetrieveBlock, + SArray* pRes) { + SSDataBlock block = {.info.type = STREAM_PULL_OVER, .info.childId = pTask->info.selfChildId}; + int32_t num = taosArrayGetSize(pRetrieveBlock->blocks); + if (num != 1) { + stError("s-task:%s invalid retrieve block number:%d, ignore", pTask->id.idStr, num); + return TSDB_CODE_INVALID_PARA; + } + + void* p = taosArrayGet(pRetrieveBlock->blocks, 0); + int32_t code = assignOneDataBlock(&block, p); + if (code) { + stError("s-task:%s failed to assign retrieve block, code:%s", pTask->id.idStr, tstrerror(code)); + return code; + } + + p = taosArrayPush(pRes, &block); + if (p != NULL) { + (*pNumOfBlocks) += 1; + stDebug("s-task:%s(child %d) retrieve res from upstream completed, QID:0x%" PRIx64, pTask->id.idStr, + pTask->info.selfChildId, pRetrieveBlock->reqId); + } else { + code = terrno; + stError("s-task:%s failed to append pull over block for retrieve data, QID:0x%" PRIx64" code:%s", pTask->id.idStr, + pRetrieveBlock->reqId, tstrerror(code)); + } + + return code; +} + int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* totalSize, int32_t* totalBlocks) { - int32_t code = TSDB_CODE_SUCCESS; - void* pExecutor = pTask->exec.pExecutor; int32_t size = 0; int32_t numOfBlocks = 0; + int32_t code = TSDB_CODE_SUCCESS; + void* pExecutor = pTask->exec.pExecutor; SArray* pRes = NULL; *totalBlocks = 0; *totalSize = 0; while (1) { + SSDataBlock* output = NULL; + uint64_t ts = 0; + if (pRes == NULL) { pRes = taosArrayInit(4, sizeof(SSDataBlock)); } @@ -115,8 +149,6 @@ int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* return code; } - SSDataBlock* output = NULL; - uint64_t ts = 0; if ((code = qExecTask(pExecutor, &output, &ts)) < 0) { if (code == TSDB_CODE_QRY_IN_EXEC) { resetTaskInfo(pExecutor); @@ -124,6 +156,7 @@ int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* if (code == TSDB_CODE_OUT_OF_MEMORY || code == TSDB_CODE_INVALID_PARA || code == TSDB_CODE_FILE_CORRUPTED) { stFatal("s-task:%s failed to continue execute since %s", pTask->id.idStr, tstrerror(code)); + taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); return code; } else { qResetTaskCode(pExecutor); @@ -133,33 +166,11 @@ int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* if (output == NULL) { if (pItem->type == STREAM_INPUT__DATA_RETRIEVE) { - SSDataBlock block = {0}; - const SStreamDataBlock* pRetrieveBlock = (const SStreamDataBlock*)pItem; - - int32_t num = taosArrayGetSize(pRetrieveBlock->blocks); - if (num != 1) { - stError("s-task:%s invalid retrieve block number:%d, ignore", pTask->id.idStr, num); - continue; - } - - code = assignOneDataBlock(&block, taosArrayGet(pRetrieveBlock->blocks, 0)); - if (code) { - stError("s-task:%s failed to copy datablock, code:%s", pTask->id.idStr, tstrerror(code)); - continue; - } - - block.info.type = STREAM_PULL_OVER; - block.info.childId = pTask->info.selfChildId; - - void* p = taosArrayPush(pRes, &block); - if (p != NULL) { - numOfBlocks += 1; - } else { - stError("s-task:%s failed to add retrieve block", pTask->id.idStr); - } - - stDebug("s-task:%s(child %d) retrieve process completed,QID:0x%" PRIx64 " dump results", pTask->id.idStr, - pTask->info.selfChildId, pRetrieveBlock->reqId); + code = doAppendPullOverBlock(pTask, &numOfBlocks, (SStreamDataBlock*) pItem, pRes); + if (code) { + taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); + return code; + } } break; @@ -174,26 +185,24 @@ int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* continue; // checkpoint block not dispatch to downstream tasks } - SSDataBlock block = {0}; + SSDataBlock block = {.info.childId = pTask->info.selfChildId}; code = assignOneDataBlock(&block, output); if (code) { stError("s-task:%s failed to build result block due to out of memory", pTask->id.idStr); continue; } - block.info.childId = pTask->info.selfChildId; - size += blockDataGetSize(output) + sizeof(SSDataBlock) + sizeof(SColumnInfoData) * blockDataGetNumOfCols(&block); numOfBlocks += 1; void* p = taosArrayPush(pRes, &block); if (p == NULL) { stError("s-task:%s failed to add computing results, the final res may be incorrect", pTask->id.idStr); + } else { + stDebug("s-task:%s (child %d) executed and get %d result blocks, size:%.2fMiB", pTask->id.idStr, + pTask->info.selfChildId, numOfBlocks, SIZE_IN_MiB(size)); } - stDebug("s-task:%s (child %d) executed and get %d result blocks, size:%.2fMiB", pTask->id.idStr, - pTask->info.selfChildId, numOfBlocks, SIZE_IN_MiB(size)); - // current output should be dispatched to down stream nodes if (numOfBlocks >= STREAM_RESULT_DUMP_THRESHOLD || size >= STREAM_RESULT_DUMP_SIZE_THRESHOLD) { code = doDumpResult(pTask, pItem, pRes, size, totalSize, totalBlocks); @@ -303,7 +312,7 @@ SScanhistoryDataInfo streamScanHistoryData(SStreamTask* pTask, int64_t st) { bool finished = false; const char* id = pTask->id.idStr; - if(pTask->info.taskLevel != TASK_LEVEL__SOURCE) { + if (pTask->info.taskLevel != TASK_LEVEL__SOURCE) { stError("s-task:%s not source scan-history task, not exec, quit", pTask->id.idStr); return buildScanhistoryExecRet(TASK_SCANHISTORY_QUIT, 0); } @@ -408,7 +417,7 @@ int32_t streamTransferStateDoPrepare(SStreamTask* pTask) { } } else { if (!(status == TASK_STATUS__READY || status == TASK_STATUS__PAUSE || status == TASK_STATUS__DROPPING || - status == TASK_STATUS__STOP)) { + status == TASK_STATUS__STOP)) { stError("s-task:%s invalid task status:%d", id, status); return TSDB_CODE_STREAM_INTERNAL_ERROR; } @@ -718,7 +727,7 @@ int32_t flushStateDataInExecutor(SStreamTask* pTask, SStreamQueueItem* pCheckpoi // 2. flush data in executor to K/V store, which should be completed before do checkpoint in the K/V. int32_t code = doStreamTaskExecImpl(pTask, pCheckpointBlock, 1); - if(code) { + if (code) { stError("s-task:%s failed to exec stream task before checkpoint, code:%s", id, tstrerror(code)); } @@ -833,7 +842,7 @@ static int32_t doStreamExecTask(SStreamTask* pTask) { if (pState.state == TASK_STATUS__CK) { stDebug("s-task:%s checkpoint block received, set status:%s", id, pState.name); code = streamTaskBuildCheckpoint(pTask); // ignore this error msg, and continue - } else { // todo refactor + } else { // todo refactor if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { code = streamTaskSendCheckpointSourceRsp(pTask); } else { From 7274500a78f5a7a30b4c2f4236e70218b72cae13 Mon Sep 17 00:00:00 2001 From: xiao-77 Date: Fri, 27 Sep 2024 16:11:57 +0800 Subject: [PATCH 15/20] fix:When opening vnodes, clean up vnodes that are in the dropped state --- source/dnode/mgmt/mgmt_vnode/src/vmInt.c | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c index b5aff49232..d081e70ff0 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c @@ -297,11 +297,22 @@ static void *vmOpenVnodeInThread(void *param) { SVnodeMgmt *pMgmt = pThread->pMgmt; char path[TSDB_FILENAME_LEN]; - dInfo("thread:%d, start to open %d vnodes", pThread->threadIndex, pThread->vnodeNum); + dInfo("thread:%d, start to open or destroy %d vnodes", pThread->threadIndex, pThread->vnodeNum); setThreadName("open-vnodes"); for (int32_t v = 0; v < pThread->vnodeNum; ++v) { SWrapperCfg *pCfg = &pThread->pCfgs[v]; + if (pCfg->dropped) { + char stepDesc[TSDB_STEP_DESC_LEN] = {0}; + snprintf(stepDesc, TSDB_STEP_DESC_LEN, "vgId:%d, start to destroy, %d of %d have been dropped", pCfg->vgId, + pMgmt->state.openVnodes, pMgmt->state.totalVnodes); + tmsgReportStartup("vnode-destroy", stepDesc); + + snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, pCfg->vgId); + vnodeDestroy(pCfg->vgId, path, pMgmt->pTfs, 0); + pThread->updateVnodesList = true; + continue; + } char stepDesc[TSDB_STEP_DESC_LEN] = {0}; snprintf(stepDesc, TSDB_STEP_DESC_LEN, "vgId:%d, start to restore, %d of %d have been opened", pCfg->vgId, From a621005f653092c7932e95815e6b3737fc903c22 Mon Sep 17 00:00:00 2001 From: jiajingbin Date: Fri, 27 Sep 2024 16:21:50 +0800 Subject: [PATCH 16/20] test: modify crash_gen default taos.cfg --- cmake/cmake.install | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/cmake/cmake.install b/cmake/cmake.install index 67634625ce..119bda1c38 100644 --- a/cmake/cmake.install +++ b/cmake/cmake.install @@ -7,7 +7,17 @@ ADD_CUSTOM_COMMAND(OUTPUT ${PREPARE_ENV_CMD} COMMAND ${CMAKE_COMMAND} -E make_directory ${TD_TESTS_OUTPUT_DIR}/cfg/ COMMAND ${CMAKE_COMMAND} -E make_directory ${TD_TESTS_OUTPUT_DIR}/log/ COMMAND ${CMAKE_COMMAND} -E make_directory ${TD_TESTS_OUTPUT_DIR}/data/ - COMMAND ${CMAKE_COMMAND} -E echo dataDir ${TD_TESTS_OUTPUT_DIR}/data > ${TD_TESTS_OUTPUT_DIR}/cfg/taos.cfg + COMMAND ${CMAKE_COMMAND} -E echo firstEp localhost:6030 > ${TD_TESTS_OUTPUT_DIR}/cfg/taos.cfg + COMMAND ${CMAKE_COMMAND} -E echo fqdn localhost >> ${TD_TESTS_OUTPUT_DIR}/cfg/taos.cfg + COMMAND ${CMAKE_COMMAND} -E echo serverPort 6030 >> ${TD_TESTS_OUTPUT_DIR}/cfg/taos.cfg + COMMAND ${CMAKE_COMMAND} -E echo debugFlag 135 >> ${TD_TESTS_OUTPUT_DIR}/cfg/taos.cfg + COMMAND ${CMAKE_COMMAND} -E echo asyncLog 0 >> ${TD_TESTS_OUTPUT_DIR}/cfg/taos.cfg + COMMAND ${CMAKE_COMMAND} -E echo supportVnodes 1024 >> ${TD_TESTS_OUTPUT_DIR}/cfg/taos.cfg + COMMAND ${CMAKE_COMMAND} -E echo numOfLogLines 300000000 >> ${TD_TESTS_OUTPUT_DIR}/cfg/taos.cfg + COMMAND ${CMAKE_COMMAND} -E echo logKeepDays -1 >> ${TD_TESTS_OUTPUT_DIR}/cfg/taos.cfg + COMMAND ${CMAKE_COMMAND} -E echo checkpointInterval 60 >> ${TD_TESTS_OUTPUT_DIR}/cfg/taos.cfg + COMMAND ${CMAKE_COMMAND} -E echo snodeAddress 127.0.0.1:873 >> ${TD_TESTS_OUTPUT_DIR}/cfg/taos.cfg + COMMAND ${CMAKE_COMMAND} -E echo dataDir ${TD_TESTS_OUTPUT_DIR}/data >> ${TD_TESTS_OUTPUT_DIR}/cfg/taos.cfg COMMAND ${CMAKE_COMMAND} -E echo logDir ${TD_TESTS_OUTPUT_DIR}/log >> ${TD_TESTS_OUTPUT_DIR}/cfg/taos.cfg COMMAND ${CMAKE_COMMAND} -E echo charset UTF-8 >> ${TD_TESTS_OUTPUT_DIR}/cfg/taos.cfg COMMAND ${CMAKE_COMMAND} -E echo monitor 0 >> ${TD_TESTS_OUTPUT_DIR}/cfg/taos.cfg From cc1cf679e1d73b73dea15ea98942cb03522e0bcc Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Fri, 27 Sep 2024 16:47:52 +0800 Subject: [PATCH 17/20] fix: task redirect issue --- source/libs/scheduler/inc/schInt.h | 1 + source/libs/scheduler/src/schTask.c | 9 +++------ 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/source/libs/scheduler/inc/schInt.h b/source/libs/scheduler/inc/schInt.h index 0f3d1bfa81..8a156e8a06 100644 --- a/source/libs/scheduler/inc/schInt.h +++ b/source/libs/scheduler/inc/schInt.h @@ -61,6 +61,7 @@ typedef enum { #define SCH_MAX_TASK_TIMEOUT_USEC 300000000 #define SCH_DEFAULT_MAX_RETRY_NUM 6 #define SCH_MIN_AYSNC_EXEC_NUM 3 +#define SCH_DEFAULT_RETRY_TOTAL_ROUND 3 typedef struct SSchDebug { bool lockEnable; diff --git a/source/libs/scheduler/src/schTask.c b/source/libs/scheduler/src/schTask.c index 3a63889271..375ad5fa37 100644 --- a/source/libs/scheduler/src/schTask.c +++ b/source/libs/scheduler/src/schTask.c @@ -358,11 +358,6 @@ int32_t schRescheduleTask(SSchJob *pJob, SSchTask *pTask) { int32_t schChkUpdateRedirectCtx(SSchJob *pJob, SSchTask *pTask, SEpSet *pEpSet, int32_t rspCode) { SSchRedirectCtx *pCtx = &pTask->redirectCtx; - if (JOB_TASK_STATUS_EXEC == pTask->status) { - SCH_TASK_DLOG("task not start yet, rspCode:%d", rspCode); - return TSDB_CODE_SUCCESS; - } - if (!pCtx->inRedirect) { pCtx->inRedirect = true; pCtx->periodMs = tsRedirectPeriod; @@ -371,9 +366,11 @@ int32_t schChkUpdateRedirectCtx(SSchJob *pJob, SSchTask *pTask, SEpSet *pEpSet, if (SCH_IS_DATA_BIND_TASK(pTask)) { if (pEpSet) { pCtx->roundTotal = pEpSet->numOfEps; - } else { + } else if (pTask->candidateAddrs && taosArrayGetSize(pTask->candidateAddrs) > 0) { SQueryNodeAddr *pAddr = taosArrayGet(pTask->candidateAddrs, 0); pCtx->roundTotal = pAddr->epSet.numOfEps; + } else { + pCtx->roundTotal = SCH_DEFAULT_RETRY_TOTAL_ROUND; } } else { pCtx->roundTotal = 1; From 7198202611b0867a5ed7b1e4a88ec88b0920741f Mon Sep 17 00:00:00 2001 From: xsren <285808407@qq.com> Date: Fri, 27 Sep 2024 18:21:01 +0800 Subject: [PATCH 18/20] fix: build on windows --- source/dnode/mnode/impl/src/mndStreamTrans.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/dnode/mnode/impl/src/mndStreamTrans.c b/source/dnode/mnode/impl/src/mndStreamTrans.c index c08478b359..905a73ad48 100644 --- a/source/dnode/mnode/impl/src/mndStreamTrans.c +++ b/source/dnode/mnode/impl/src/mndStreamTrans.c @@ -324,7 +324,7 @@ void killAllCheckpointTrans(SMnode *pMnode, SVgroupChangeInfo *pChangeInfo) { size_t len = 0; void *pKey = taosHashGetKey(pDb, &len); - int cpLen = MIN(127, len); + int cpLen = (127 < len) ? 127 : len; TAOS_STRNCPY(p, pKey, cpLen); p[cpLen] = '\0'; From 75a66459252287a3f00a45502c2a00cd76252fff Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 27 Sep 2024 19:22:53 +0800 Subject: [PATCH 19/20] fix(stream): set the correct res block info. --- source/libs/stream/src/streamExec.c | 36 +++++++++++++++++++++++++++-- 1 file changed, 34 insertions(+), 2 deletions(-) diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index e4dc0b5854..5300792338 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -98,7 +98,7 @@ static int32_t doDumpResult(SStreamTask* pTask, SStreamQueueItem* pItem, SArray* static int32_t doAppendPullOverBlock(SStreamTask* pTask, int32_t* pNumOfBlocks, SStreamDataBlock* pRetrieveBlock, SArray* pRes) { - SSDataBlock block = {.info.type = STREAM_PULL_OVER, .info.childId = pTask->info.selfChildId}; + SSDataBlock block = {0}; int32_t num = taosArrayGetSize(pRetrieveBlock->blocks); if (num != 1) { stError("s-task:%s invalid retrieve block number:%d, ignore", pTask->id.idStr, num); @@ -112,6 +112,9 @@ static int32_t doAppendPullOverBlock(SStreamTask* pTask, int32_t* pNumOfBlocks, return code; } + block.info.type = STREAM_PULL_OVER; + block.info.childId = pTask->info.selfChildId; + p = taosArrayPush(pRes, &block); if (p != NULL) { (*pNumOfBlocks) += 1; @@ -171,6 +174,33 @@ int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); return code; } +// SSDataBlock block = {0}; +// const SStreamDataBlock* pRetrieveBlock = (const SStreamDataBlock*)pItem; +// +// int32_t num = taosArrayGetSize(pRetrieveBlock->blocks); +// if (num != 1) { +// stError("s-task:%s invalid retrieve block number:%d, ignore", pTask->id.idStr, num); +// continue; +// } +// +// code = assignOneDataBlock(&block, taosArrayGet(pRetrieveBlock->blocks, 0)); +// if (code) { +// stError("s-task:%s failed to copy datablock, code:%s", pTask->id.idStr, tstrerror(code)); +// continue; +// } +// +// block.info.type = STREAM_PULL_OVER; +// block.info.childId = pTask->info.selfChildId; +// +// void* p = taosArrayPush(pRes, &block); +// if (p != NULL) { +// numOfBlocks += 1; +// } else { +// stError("s-task:%s failed to add retrieve block", pTask->id.idStr); +// } +// +// stDebug("s-task:%s(child %d) retrieve process completed,QID:0x%" PRIx64 " dump results", pTask->id.idStr, +// pTask->info.selfChildId, pRetrieveBlock->reqId); } break; @@ -185,13 +215,15 @@ int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* continue; // checkpoint block not dispatch to downstream tasks } - SSDataBlock block = {.info.childId = pTask->info.selfChildId}; + SSDataBlock block = {0}; code = assignOneDataBlock(&block, output); if (code) { stError("s-task:%s failed to build result block due to out of memory", pTask->id.idStr); continue; } + block.info.childId = pTask->info.selfChildId; + size += blockDataGetSize(output) + sizeof(SSDataBlock) + sizeof(SColumnInfoData) * blockDataGetNumOfCols(&block); numOfBlocks += 1; From 70c96783c01e13db7b1495357e5a6f73492cc1f0 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 27 Sep 2024 19:23:24 +0800 Subject: [PATCH 20/20] refactor: remove unused code. --- source/libs/stream/src/streamExec.c | 27 --------------------------- 1 file changed, 27 deletions(-) diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 5300792338..0eb87df9b0 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -174,33 +174,6 @@ int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); return code; } -// SSDataBlock block = {0}; -// const SStreamDataBlock* pRetrieveBlock = (const SStreamDataBlock*)pItem; -// -// int32_t num = taosArrayGetSize(pRetrieveBlock->blocks); -// if (num != 1) { -// stError("s-task:%s invalid retrieve block number:%d, ignore", pTask->id.idStr, num); -// continue; -// } -// -// code = assignOneDataBlock(&block, taosArrayGet(pRetrieveBlock->blocks, 0)); -// if (code) { -// stError("s-task:%s failed to copy datablock, code:%s", pTask->id.idStr, tstrerror(code)); -// continue; -// } -// -// block.info.type = STREAM_PULL_OVER; -// block.info.childId = pTask->info.selfChildId; -// -// void* p = taosArrayPush(pRes, &block); -// if (p != NULL) { -// numOfBlocks += 1; -// } else { -// stError("s-task:%s failed to add retrieve block", pTask->id.idStr); -// } -// -// stDebug("s-task:%s(child %d) retrieve process completed,QID:0x%" PRIx64 " dump results", pTask->id.idStr, -// pTask->info.selfChildId, pRetrieveBlock->reqId); } break;