From ce69387a43af1c57a8c1da602c4ab363cd1872f3 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao@163.com> Date: Thu, 19 Dec 2024 10:32:32 +0800 Subject: [PATCH 01/10] ci(stream):adjust stream ci test --- include/common/tglobal.h | 1 + source/common/src/tglobal.c | 10 ++++- source/libs/parser/src/parTranslater.c | 2 +- tests/parallel_test/cases.task | 44 ++++++++++--------- .../tsim/stream/streamInterpDelete0.sim | 2 + .../tsim/stream/streamInterpDelete1.sim | 2 + .../tsim/stream/streamInterpDelete2.sim | 2 + .../script/tsim/stream/streamInterpError.sim | 2 + .../tsim/stream/streamInterpHistory.sim | 2 + .../script/tsim/stream/streamInterpLarge.sim | 2 + .../tsim/stream/streamInterpLinear0.sim | 2 + .../script/tsim/stream/streamInterpNext0.sim | 2 + .../script/tsim/stream/streamInterpOther.sim | 2 + .../tsim/stream/streamInterpPartitionBy0.sim | 2 + .../tsim/stream/streamInterpPartitionBy1.sim | 2 + .../tsim/stream/streamInterpPrimaryKey0.sim | 2 + .../tsim/stream/streamInterpPrimaryKey1.sim | 2 + .../tsim/stream/streamInterpPrimaryKey3.sim | 2 + .../script/tsim/stream/streamInterpUpdate.sim | 2 + .../tsim/stream/streamInterpUpdate1.sim | 2 + .../tsim/stream/streamInterpUpdate2.sim | 2 + .../script/tsim/stream/streamInterpValue0.sim | 2 + 22 files changed, 70 insertions(+), 23 deletions(-) diff --git a/include/common/tglobal.h b/include/common/tglobal.h index e6333d2ddc..8180243ff0 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -291,6 +291,7 @@ extern bool tsFilterScalarMode; extern int32_t tsMaxStreamBackendCache; extern int32_t tsPQSortMemThreshold; extern int32_t tsResolveFQDNRetryTime; +extern bool tsStreamCoverage; extern bool tsExperimental; // #define NEEDTO_COMPRESSS_MSG(size) (tsCompressMsgSize != -1 && (size) > tsCompressMsgSize) diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 905dcb4fda..83747b11e6 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -328,6 +328,7 @@ int64_t tsStreamBufferSize = 128 * 1024 * 1024; bool tsFilterScalarMode = false; int tsResolveFQDNRetryTime = 100; // seconds int tsStreamAggCnt = 100000; +bool tsStreamCoverage = false; bool tsUpdateCacheBatch = true; @@ -733,6 +734,9 @@ static int32_t taosAddClientCfg(SConfig *pCfg) { CFG_DYN_CLIENT, CFG_CATEGORY_LOCAL)); TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "tsmaDataDeleteMark", tsmaDataDeleteMark, 60 * 60 * 1000, INT64_MAX, CFG_SCOPE_CLIENT, CFG_DYN_CLIENT, CFG_CATEGORY_LOCAL)); + + TAOS_CHECK_RETURN(cfgAddBool(pCfg, "streamCoverage", tsStreamCoverage, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER_LAZY,CFG_CATEGORY_GLOBAL)); + TAOS_RETURN(TSDB_CODE_SUCCESS); } @@ -1463,6 +1467,9 @@ static int32_t taosSetClientCfg(SConfig *pCfg) { TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "bypassFlag"); tsBypassFlag = pItem->i32; + TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "streamCoverage"); + tsStreamCoverage = pItem->bval; + TAOS_RETURN(TSDB_CODE_SUCCESS); } @@ -2735,7 +2742,8 @@ static int32_t taosCfgDynamicOptionsForClient(SConfig *pCfg, const char *name) { {"maxTsmaCalcDelay", &tsMaxTsmaCalcDelay}, {"tsmaDataDeleteMark", &tsmaDataDeleteMark}, {"numOfRpcSessions", &tsNumOfRpcSessions}, - {"bypassFlag", &tsBypassFlag}}; + {"bypassFlag", &tsBypassFlag}, + {"streamCoverage", &tsStreamCoverage}}; if ((code = taosCfgSetOption(debugOptions, tListLen(debugOptions), pItem, true)) != TSDB_CODE_SUCCESS) { code = taosCfgSetOption(options, tListLen(options), pItem, false); diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index feaa6910f2..342bd6d66e 100755 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -11411,7 +11411,7 @@ static int32_t checkStreamQuery(STranslateContext* pCxt, SCreateStreamStmt* pStm if (pSelect->hasInterpFunc) { // Temporary code - if (pStmt->pOptions->triggerType != STREAM_TRIGGER_FORCE_WINDOW_CLOSE) { + if (tsStreamCoverage == false && pStmt->pOptions->triggerType != STREAM_TRIGGER_FORCE_WINDOW_CLOSE) { return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, "Stream interp function only support force window close"); } diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index 74274ab1ba..6c5ad9b23d 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -1419,34 +1419,36 @@ ,,y,script,./test.sh -f tsim/stream/sliding.sim ,,y,script,./test.sh -f tsim/stream/state0.sim ,,y,script,./test.sh -f tsim/stream/state1.sim -#,,y,script,./test.sh -f tsim/stream/streamInterpDelete0.sim -#,,y,script,./test.sh -f tsim/stream/streamInterpDelete1.sim -#,,y,script,./test.sh -f tsim/stream/streamInterpDelete2.sim -#,,y,script,./test.sh -f tsim/stream/streamInterpError.sim +,,y,script,./test.sh -f tsim/stream/streamInterpDelete0.sim +,,y,script,./test.sh -f tsim/stream/streamInterpDelete1.sim +,,y,script,./test.sh -f tsim/stream/streamInterpDelete2.sim +,,y,script,./test.sh -f tsim/stream/streamInterpError.sim ,,y,script,./test.sh -f tsim/stream/streamInterpForceWindowClose.sim ,,y,script,./test.sh -f tsim/stream/streamInterpForceWindowClose1.sim ,,y,script,./test.sh -f tsim/stream/streamInterpFwcError.sim -#,,y,script,./test.sh -f tsim/stream/streamInterpHistory.sim +,,y,script,./test.sh -f tsim/stream/streamInterpHistory.sim #,,y,script,./test.sh -f tsim/stream/streamInterpHistory1.sim -#,,y,script,./test.sh -f tsim/stream/streamInterpLarge.sim -#,,y,script,./test.sh -f tsim/stream/streamInterpLinear0.sim -#,,y,script,./test.sh -f tsim/stream/streamInterpNext0.sim -#,,y,script,./test.sh -f tsim/stream/streamInterpOther.sim +,,y,script,./test.sh -f tsim/stream/streamInterpLarge.sim +,,y,script,./test.sh -f tsim/stream/streamInterpLinear0.sim +,,y,script,./test.sh -f tsim/stream/streamInterpNext0.sim +,,y,script,./test.sh -f tsim/stream/streamInterpOther.sim #,,y,script,./test.sh -f tsim/stream/streamInterpOther1.sim -#,,y,script,./test.sh -f tsim/stream/streamInterpPartitionBy0.sim +,,y,script,./test.sh -f tsim/stream/streamInterpPartitionBy0.sim +,,y,script,./test.sh -f tsim/stream/streamInterpPartitionBy1.sim #,,y,script,./test.sh -f tsim/stream/streamInterpPrev0.sim #,,y,script,./test.sh -f tsim/stream/streamInterpPrev1.sim -#,,y,script,./test.sh -f tsim/stream/streamInterpPrimaryKey0.sim -#,,y,script,./test.sh -f tsim/stream/streamInterpPrimaryKey1.sim -#,,y,script,./test.sh -f tsim/stream/streamInterpPrimaryKey2.sim -#,,y,script,./test.sh -f tsim/stream/streamInterpPrimaryKey3.sim -#,,y,script,./test.sh -f tsim/stream/streamInterpUpdate.sim -#,,y,script,./test.sh -f tsim/stream/streamInterpUpdate1.sim -#,,y,script,./test.sh -f tsim/stream/streamInterpValue0.sim -#,,y,script,./test.sh -f tsim/stream/streamPrimaryKey0.sim -#,,y,script,./test.sh -f tsim/stream/streamPrimaryKey1.sim -#,,y,script,./test.sh -f tsim/stream/streamPrimaryKey2.sim -#,,y,script,./test.sh -f tsim/stream/streamPrimaryKey3.sim +,,y,script,./test.sh -f tsim/stream/streamInterpPrimaryKey0.sim +,,y,script,./test.sh -f tsim/stream/streamInterpPrimaryKey1.sim +,,y,script,./test.sh -f tsim/stream/streamInterpPrimaryKey2.sim +,,y,script,./test.sh -f tsim/stream/streamInterpPrimaryKey3.sim +,,y,script,./test.sh -f tsim/stream/streamInterpUpdate.sim +,,y,script,./test.sh -f tsim/stream/streamInterpUpdate1.sim +,,y,script,./test.sh -f tsim/stream/streamInterpUpdate2.sim +,,y,script,./test.sh -f tsim/stream/streamInterpValue0.sim +,,y,script,./test.sh -f tsim/stream/streamPrimaryKey0.sim +,,y,script,./test.sh -f tsim/stream/streamPrimaryKey1.sim +,,y,script,./test.sh -f tsim/stream/streamPrimaryKey2.sim +,,y,script,./test.sh -f tsim/stream/streamPrimaryKey3.sim ,,y,script,./test.sh -f tsim/stream/streamTwaError.sim ,,y,script,./test.sh -f tsim/stream/streamTwaFwcFill.sim ,,y,script,./test.sh -f tsim/stream/streamTwaFwcFillPrimaryKey.sim diff --git a/tests/script/tsim/stream/streamInterpDelete0.sim b/tests/script/tsim/stream/streamInterpDelete0.sim index 21bac13e4a..440d7ce413 100644 --- a/tests/script/tsim/stream/streamInterpDelete0.sim +++ b/tests/script/tsim/stream/streamInterpDelete0.sim @@ -4,6 +4,8 @@ system sh/exec.sh -n dnode1 -s start sleep 50 sql connect +sql alter local 'streamCoverage' '1'; + print step1 print =============== create database sql create database test vgroups 1; diff --git a/tests/script/tsim/stream/streamInterpDelete1.sim b/tests/script/tsim/stream/streamInterpDelete1.sim index 162da175e8..9413cf8918 100644 --- a/tests/script/tsim/stream/streamInterpDelete1.sim +++ b/tests/script/tsim/stream/streamInterpDelete1.sim @@ -4,6 +4,8 @@ system sh/exec.sh -n dnode1 -s start sleep 50 sql connect +sql alter local 'streamCoverage' '1'; + print step1 print =============== create database sql create database test vgroups 1; diff --git a/tests/script/tsim/stream/streamInterpDelete2.sim b/tests/script/tsim/stream/streamInterpDelete2.sim index be27dcda49..fb53678eff 100644 --- a/tests/script/tsim/stream/streamInterpDelete2.sim +++ b/tests/script/tsim/stream/streamInterpDelete2.sim @@ -4,6 +4,8 @@ system sh/exec.sh -n dnode1 -s start sleep 50 sql connect +sql alter local 'streamCoverage' '1'; + print step1 print =============== create database sql create database test vgroups 1; diff --git a/tests/script/tsim/stream/streamInterpError.sim b/tests/script/tsim/stream/streamInterpError.sim index 53a92df772..f0f4e80ade 100644 --- a/tests/script/tsim/stream/streamInterpError.sim +++ b/tests/script/tsim/stream/streamInterpError.sim @@ -4,6 +4,8 @@ system sh/exec.sh -n dnode1 -s start sleep 50 sql connect +sql alter local 'streamCoverage' '1'; + print step2 sql create database test2 vgroups 1; diff --git a/tests/script/tsim/stream/streamInterpHistory.sim b/tests/script/tsim/stream/streamInterpHistory.sim index b9685ebf05..9737e7d155 100644 --- a/tests/script/tsim/stream/streamInterpHistory.sim +++ b/tests/script/tsim/stream/streamInterpHistory.sim @@ -4,6 +4,8 @@ system sh/exec.sh -n dnode1 -s start sleep 50 sql connect +sql alter local 'streamCoverage' '1'; + print step1 print =============== create database sql create database test vgroups 1; diff --git a/tests/script/tsim/stream/streamInterpLarge.sim b/tests/script/tsim/stream/streamInterpLarge.sim index 85203d2d9e..2626f49b6a 100644 --- a/tests/script/tsim/stream/streamInterpLarge.sim +++ b/tests/script/tsim/stream/streamInterpLarge.sim @@ -4,6 +4,8 @@ system sh/exec.sh -n dnode1 -s start sleep 50 sql connect +sql alter local 'streamCoverage' '1'; + print step1 print =============== create database sql create database test vgroups 1; diff --git a/tests/script/tsim/stream/streamInterpLinear0.sim b/tests/script/tsim/stream/streamInterpLinear0.sim index 7d4b28d545..c52540895b 100644 --- a/tests/script/tsim/stream/streamInterpLinear0.sim +++ b/tests/script/tsim/stream/streamInterpLinear0.sim @@ -4,6 +4,8 @@ system sh/exec.sh -n dnode1 -s start sleep 50 sql connect +sql alter local 'streamCoverage' '1'; + print step1 print =============== create database sql create database test vgroups 1; diff --git a/tests/script/tsim/stream/streamInterpNext0.sim b/tests/script/tsim/stream/streamInterpNext0.sim index abdbeda634..4395031aec 100644 --- a/tests/script/tsim/stream/streamInterpNext0.sim +++ b/tests/script/tsim/stream/streamInterpNext0.sim @@ -4,6 +4,8 @@ system sh/exec.sh -n dnode1 -s start sleep 50 sql connect +sql alter local 'streamCoverage' '1'; + print step1 print =============== create database sql create database test vgroups 1; diff --git a/tests/script/tsim/stream/streamInterpOther.sim b/tests/script/tsim/stream/streamInterpOther.sim index 8553e67ec8..4572bfca56 100644 --- a/tests/script/tsim/stream/streamInterpOther.sim +++ b/tests/script/tsim/stream/streamInterpOther.sim @@ -4,6 +4,8 @@ system sh/exec.sh -n dnode1 -s start sleep 50 sql connect +sql alter local 'streamCoverage' '1'; + print step1 print =============== create database sql create database test vgroups 4; diff --git a/tests/script/tsim/stream/streamInterpPartitionBy0.sim b/tests/script/tsim/stream/streamInterpPartitionBy0.sim index 6b222de228..543bb48a1c 100644 --- a/tests/script/tsim/stream/streamInterpPartitionBy0.sim +++ b/tests/script/tsim/stream/streamInterpPartitionBy0.sim @@ -4,6 +4,8 @@ system sh/exec.sh -n dnode1 -s start sleep 50 sql connect +sql alter local 'streamCoverage' '1'; + print step prev print =============== create database sql create database test vgroups 1; diff --git a/tests/script/tsim/stream/streamInterpPartitionBy1.sim b/tests/script/tsim/stream/streamInterpPartitionBy1.sim index ecb5e0ee62..c8138ac05f 100644 --- a/tests/script/tsim/stream/streamInterpPartitionBy1.sim +++ b/tests/script/tsim/stream/streamInterpPartitionBy1.sim @@ -4,6 +4,8 @@ system sh/exec.sh -n dnode1 -s start sleep 50 sql connect +sql alter local 'streamCoverage' '1'; + print step NULL print =============== create database sql create database test vgroups 1; diff --git a/tests/script/tsim/stream/streamInterpPrimaryKey0.sim b/tests/script/tsim/stream/streamInterpPrimaryKey0.sim index 9edddff6db..1bbc2a9b5d 100644 --- a/tests/script/tsim/stream/streamInterpPrimaryKey0.sim +++ b/tests/script/tsim/stream/streamInterpPrimaryKey0.sim @@ -4,6 +4,8 @@ system sh/exec.sh -n dnode1 -s start sleep 50 sql connect +sql alter local 'streamCoverage' '1'; + print step1 print =============== create database sql create database test vgroups 1; diff --git a/tests/script/tsim/stream/streamInterpPrimaryKey1.sim b/tests/script/tsim/stream/streamInterpPrimaryKey1.sim index 04a1f299be..0db33c9767 100644 --- a/tests/script/tsim/stream/streamInterpPrimaryKey1.sim +++ b/tests/script/tsim/stream/streamInterpPrimaryKey1.sim @@ -4,6 +4,8 @@ system sh/exec.sh -n dnode1 -s start sleep 50 sql connect +sql alter local 'streamCoverage' '1'; + print step1 print =============== create database sql create database test vgroups 1; diff --git a/tests/script/tsim/stream/streamInterpPrimaryKey3.sim b/tests/script/tsim/stream/streamInterpPrimaryKey3.sim index 725cf8d850..23cb0a58e6 100644 --- a/tests/script/tsim/stream/streamInterpPrimaryKey3.sim +++ b/tests/script/tsim/stream/streamInterpPrimaryKey3.sim @@ -4,6 +4,8 @@ system sh/exec.sh -n dnode1 -s start sleep 50 sql connect +sql alter local 'streamCoverage' '1'; + print step1 print =============== create database sql create database test vgroups 1; diff --git a/tests/script/tsim/stream/streamInterpUpdate.sim b/tests/script/tsim/stream/streamInterpUpdate.sim index 59a188c2a6..394ac1a341 100644 --- a/tests/script/tsim/stream/streamInterpUpdate.sim +++ b/tests/script/tsim/stream/streamInterpUpdate.sim @@ -4,6 +4,8 @@ system sh/exec.sh -n dnode1 -s start sleep 50 sql connect +sql alter local 'streamCoverage' '1'; + print step1 print =============== create database sql create database test vgroups 1; diff --git a/tests/script/tsim/stream/streamInterpUpdate1.sim b/tests/script/tsim/stream/streamInterpUpdate1.sim index 45f16af35d..3987afa21e 100644 --- a/tests/script/tsim/stream/streamInterpUpdate1.sim +++ b/tests/script/tsim/stream/streamInterpUpdate1.sim @@ -4,6 +4,8 @@ system sh/exec.sh -n dnode1 -s start sleep 50 sql connect +sql alter local 'streamCoverage' '1'; + print step1 print =============== create database sql create database test vgroups 1; diff --git a/tests/script/tsim/stream/streamInterpUpdate2.sim b/tests/script/tsim/stream/streamInterpUpdate2.sim index 2a71474dd7..cde5b589e8 100644 --- a/tests/script/tsim/stream/streamInterpUpdate2.sim +++ b/tests/script/tsim/stream/streamInterpUpdate2.sim @@ -4,6 +4,8 @@ system sh/exec.sh -n dnode1 -s start sleep 50 sql connect +sql alter local 'streamCoverage' '1'; + print step1 print =============== create database sql create database test vgroups 1; diff --git a/tests/script/tsim/stream/streamInterpValue0.sim b/tests/script/tsim/stream/streamInterpValue0.sim index bce7f0ece6..2cbf61f4bd 100644 --- a/tests/script/tsim/stream/streamInterpValue0.sim +++ b/tests/script/tsim/stream/streamInterpValue0.sim @@ -4,6 +4,8 @@ system sh/exec.sh -n dnode1 -s start sleep 50 sql connect +sql alter local 'streamCoverage' '1'; + print step1 print =============== create database sql create database test vgroups 1; From bdef8850ca63e32c7271920a481ab3c73f3f07c2 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao@163.com> Date: Thu, 19 Dec 2024 14:16:00 +0800 Subject: [PATCH 02/10] fix issue --- source/common/src/tglobal.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 83747b11e6..b732b7140a 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -735,7 +735,7 @@ static int32_t taosAddClientCfg(SConfig *pCfg) { TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "tsmaDataDeleteMark", tsmaDataDeleteMark, 60 * 60 * 1000, INT64_MAX, CFG_SCOPE_CLIENT, CFG_DYN_CLIENT, CFG_CATEGORY_LOCAL)); - TAOS_CHECK_RETURN(cfgAddBool(pCfg, "streamCoverage", tsStreamCoverage, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER_LAZY,CFG_CATEGORY_GLOBAL)); + TAOS_CHECK_RETURN(cfgAddBool(pCfg, "streamCoverage", tsStreamCoverage, CFG_DYN_CLIENT, CFG_DYN_CLIENT, CFG_CATEGORY_LOCAL)); TAOS_RETURN(TSDB_CODE_SUCCESS); } From 5b10753ac08e9b54c50f19d2dbf8a1f6e15e5248 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao@163.com> Date: Thu, 19 Dec 2024 18:23:57 +0800 Subject: [PATCH 03/10] add ut --- source/libs/stream/inc/streamInt.h | 3 + source/libs/stream/src/streamCheckpoint.c | 6 +- .../libs/stream/test/streamCheckPointTest.cpp | 203 ++++++++++++++++++ 3 files changed, 211 insertions(+), 1 deletion(-) create mode 100644 source/libs/stream/test/streamCheckPointTest.cpp diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index c25b0f34fc..41ac0117f3 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -250,6 +250,9 @@ void chkptFailedByRetrieveReqToSource(SStreamTask* pTask, int64_t checkpointId); // inject stream errors void chkptFailedByRetrieveReqToSource(SStreamTask* pTask, int64_t checkpointId); +int32_t uploadCheckpointData(SStreamTask* pTask, int64_t checkpointId, int64_t dbRefId, ECHECKPOINT_BACKUP_TYPE type); +int32_t chkptTriggerRecvMonitorHelper(SStreamTask* pTask, void* param, SArray** ppNotSendList); + #ifdef __cplusplus } #endif diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 876cd1b472..0ec66cd2ce 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -21,7 +21,9 @@ static int32_t downloadCheckpointDataByName(const char* id, const char* fname, const char* dstName); static int32_t deleteCheckpointFile(const char* id, const char* name); static int32_t streamTaskUploadCheckpoint(const char* id, const char* path, int64_t checkpointId); +#ifdef BUILD_NO_CALL static int32_t deleteCheckpoint(const char* id); +#endif static int32_t downloadCheckpointByNameS3(const char* id, const char* fname, const char* dstName); static int32_t continueDispatchCheckpointTriggerBlock(SStreamDataBlock* pBlock, SStreamTask* pTask); static int32_t appendCheckpointIntoInputQ(SStreamTask* pTask, int32_t checkpointType, int64_t checkpointId, @@ -998,7 +1000,7 @@ static int32_t doFindNotSendUpstream(SStreamTask* pTask, SArray* pList, SArray** return 0; } -static int32_t chkptTriggerRecvMonitorHelper(SStreamTask* pTask, void* param, SArray** ppNotSendList) { +int32_t chkptTriggerRecvMonitorHelper(SStreamTask* pTask, void* param, SArray** ppNotSendList) { const char* id = pTask->id.idStr; SArray* pList = pTask->upstreamInfo.pList; // send msg to retrieve checkpoint trigger msg SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo; @@ -1492,6 +1494,7 @@ int32_t streamTaskDownloadCheckpointData(const char* id, char* path, int64_t che return 0; } +#ifdef BUILD_NO_CALL int32_t deleteCheckpoint(const char* id) { if (id == NULL || strlen(id) == 0) { stError("deleteCheckpoint parameters invalid"); @@ -1504,6 +1507,7 @@ int32_t deleteCheckpoint(const char* id) { } return 0; } +#endif int32_t deleteCheckpointFile(const char* id, const char* name) { char object[128] = {0}; diff --git a/source/libs/stream/test/streamCheckPointTest.cpp b/source/libs/stream/test/streamCheckPointTest.cpp new file mode 100644 index 0000000000..cad02a514e --- /dev/null +++ b/source/libs/stream/test/streamCheckPointTest.cpp @@ -0,0 +1,203 @@ +#include +#include "tstream.h" +#include "streamInt.h" + +TEST(streamCheckpointTest, StreamTaskProcessCheckpointTriggerRsp) { + SStreamTask* pTask = NULL; + int64_t uid = 1111111111111111; + SArray* array = taosArrayInit(4, sizeof(SStreamTask)); + int32_t code = tNewStreamTask(uid, TASK_LEVEL__SINK, NULL, false, 0, 0, array, + false, 1, &pTask); + ASSERT_EQ(code, TSDB_CODE_SUCCESS); + + code = streamTaskCreateActiveChkptInfo(&pTask->chkInfo.pActiveInfo); + ASSERT_EQ(code, TSDB_CODE_SUCCESS); + + pTask->chkInfo.pActiveInfo->activeId = 123111; + pTask->chkInfo.pActiveInfo->transId = 4561111; + + streamTaskSetStatusReady(pTask); + code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_GEN_CHECKPOINT); + ASSERT_EQ(code, TSDB_CODE_SUCCESS); + + SCheckpointTriggerRsp pRsp; + memset(&pRsp, 0, sizeof(SCheckpointTriggerRsp)); + pRsp.rspCode = TSDB_CODE_SUCCESS; + pRsp.checkpointId = 123; + pRsp.transId = 456; + pRsp.upstreamTaskId = 789; + + code = streamTaskProcessCheckpointTriggerRsp(pTask, &pRsp); + ASSERT_NE(code, TSDB_CODE_SUCCESS); + + pRsp.rspCode = TSDB_CODE_FAILED; + code = streamTaskProcessCheckpointTriggerRsp(pTask, &pRsp); + ASSERT_EQ(code, TSDB_CODE_SUCCESS); + + tFreeStreamTask(pTask); +} + +TEST(streamCheckpointTest, StreamTaskSetFailedCheckpointId) { + SStreamTask* pTask = NULL; + int64_t uid = 1111111111111111; + SArray* array = taosArrayInit(4, sizeof(SStreamTask)); + int32_t code = tNewStreamTask(uid, TASK_LEVEL__SINK, NULL, false, 0, 0, array, + false, 1, &pTask); + ASSERT_EQ(code, TSDB_CODE_SUCCESS); + + code = streamTaskCreateActiveChkptInfo(&pTask->chkInfo.pActiveInfo); + ASSERT_EQ(code, TSDB_CODE_SUCCESS); + + SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo; + pInfo->failedId = 0; + + int64_t failedCheckpointId = 123; + + streamTaskSetFailedCheckpointId(pTask, failedCheckpointId); + ASSERT_EQ(pInfo->failedId, failedCheckpointId); + + streamTaskSetFailedCheckpointId(pTask, 0); + ASSERT_EQ(pInfo->failedId, failedCheckpointId); + + streamTaskSetFailedCheckpointId(pTask, pInfo->failedId - 1); + ASSERT_EQ(pInfo->failedId, failedCheckpointId); + tFreeStreamTask(pTask); +} + +TEST(UploadCheckpointDataTest, UploadSuccess) { + SStreamTask* pTask = NULL; + int64_t uid = 1111111111111111; + SArray* array = taosArrayInit(4, sizeof(SStreamTask)); + int32_t code = tNewStreamTask(uid, TASK_LEVEL__SINK, NULL, false, 0, 0, array, + false, 1, &pTask); + ASSERT_EQ(code, TSDB_CODE_SUCCESS); + + code = streamTaskCreateActiveChkptInfo(&pTask->chkInfo.pActiveInfo); + ASSERT_EQ(code, TSDB_CODE_SUCCESS); + + int64_t checkpointId = 123; + int64_t dbRefId = 1; + ECHECKPOINT_BACKUP_TYPE type = DATA_UPLOAD_S3; + + STaskDbWrapper* pBackend = NULL; + int64_t processVer = -1; + const char *path = "/tmp/backend3/stream"; + code = streamMetaOpen((path), NULL, NULL, NULL, 0, 0, NULL, &pTask->pMeta); + ASSERT_EQ(code, TSDB_CODE_SUCCESS); + + SStreamState *pState = streamStateOpen((char *)path, pTask, 0, 0); + ASSERT(pState != NULL); + + pTask->pBackend = pState->pTdbState->pOwner->pBackend; + + code = taskDbDoCheckpoint(pTask->pBackend, checkpointId, 0); + ASSERT(code == 0); + + int32_t result = uploadCheckpointData(pTask, checkpointId, dbRefId, type); + + EXPECT_EQ(result, TSDB_CODE_SUCCESS) << "uploadCheckpointData should return 0 on success"; +} + +TEST(UploadCheckpointDataTest, UploadDisabled) { + SStreamTask* pTask = NULL; + int64_t uid = 2222222222222; + SArray* array = taosArrayInit(4, sizeof(SStreamTask)); + int32_t code = tNewStreamTask(uid, TASK_LEVEL__SINK, NULL, false, 0, 0, array, + false, 1, &pTask); + ASSERT_EQ(code, TSDB_CODE_SUCCESS); + + code = streamTaskCreateActiveChkptInfo(&pTask->chkInfo.pActiveInfo); + ASSERT_EQ(code, TSDB_CODE_SUCCESS); + + int64_t checkpointId = 123; + int64_t dbRefId = 1; + + STaskDbWrapper* pBackend = NULL; + int64_t processVer = -1; + const char *path = "/tmp/backend4/stream"; + code = streamMetaOpen((path), NULL, NULL, NULL, 0, 0, NULL, &pTask->pMeta); + ASSERT_EQ(code, TSDB_CODE_SUCCESS); + + SStreamState *pState = streamStateOpen((char *)path, pTask, 0, 0); + ASSERT(pState != NULL); + + pTask->pBackend = pState->pTdbState->pOwner->pBackend; + + code = taskDbDoCheckpoint(pTask->pBackend, checkpointId, 0); + ASSERT(code == 0); + + ECHECKPOINT_BACKUP_TYPE type = DATA_UPLOAD_DISABLE; + + int32_t result = uploadCheckpointData(pTask, checkpointId, dbRefId, type); + + EXPECT_NE(result, TSDB_CODE_SUCCESS) << "uploadCheckpointData should return 0 when backup type is disabled"; +} + +TEST(StreamTaskAlreadySendTriggerTest, AlreadySendTrigger) { + SStreamTask* pTask = NULL; + int64_t uid = 2222222222222; + SArray* array = taosArrayInit(4, sizeof(SStreamTask)); + int32_t code = tNewStreamTask(uid, TASK_LEVEL__SINK, NULL, false, 0, 0, array, + false, 1, &pTask); + ASSERT_EQ(code, TSDB_CODE_SUCCESS); + + code = streamTaskCreateActiveChkptInfo(&pTask->chkInfo.pActiveInfo); + ASSERT_EQ(code, TSDB_CODE_SUCCESS); + + pTask->chkInfo.pActiveInfo->activeId = 123111; + pTask->chkInfo.pActiveInfo->transId = 4561111; + + streamTaskSetStatusReady(pTask); + code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_GEN_CHECKPOINT); + ASSERT_EQ(code, TSDB_CODE_SUCCESS); + + int32_t downstreamNodeId = 1; + int64_t sendingCheckpointId = 123; + + STaskTriggerSendInfo triggerInfo = {.sendTs = taosGetTimestampMs(), .recved = false, .nodeId = downstreamNodeId }; + taosArrayPush(pTask->chkInfo.pActiveInfo->pDispatchTriggerList, &triggerInfo); + + pTask->chkInfo.pActiveInfo->dispatchTrigger = true; + bool result = streamTaskAlreadySendTrigger(pTask, downstreamNodeId); + + EXPECT_TRUE(result) << "The trigger message should have been sent to the downstream node"; + + taosArrayDestroy(pTask->chkInfo.pActiveInfo->pDispatchTriggerList); +} + +TEST(ChkptTriggerRecvMonitorHelperTest, chkptTriggerRecvMonitorHelper) { + SStreamTask* pTask = NULL; + int64_t uid = 2222222222222; + SArray* array = taosArrayInit(4, sizeof(SStreamTask)); + int32_t code = tNewStreamTask(uid, TASK_LEVEL__SINK, NULL, false, 0, 0, array, + false, 1, &pTask); + ASSERT_EQ(code, TSDB_CODE_SUCCESS); + + const char *path = "/tmp/backend5/stream"; + code = streamMetaOpen((path), NULL, NULL, NULL, 0, 0, NULL, &pTask->pMeta); + ASSERT_EQ(code, TSDB_CODE_SUCCESS); + + code = streamTaskCreateActiveChkptInfo(&pTask->chkInfo.pActiveInfo); + ASSERT_EQ(code, TSDB_CODE_SUCCESS); + + pTask->chkInfo.pActiveInfo->activeId = 123111; + pTask->chkInfo.pActiveInfo->chkptTriggerMsgTmr.launchChkptId = pTask->chkInfo.pActiveInfo->activeId; + pTask->chkInfo.pActiveInfo->transId = 4561111; + pTask->chkInfo.startTs = 11111; + + streamTaskSetStatusReady(pTask); + code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_GEN_CHECKPOINT); + ASSERT_EQ(code, TSDB_CODE_SUCCESS); + + int32_t downstreamNodeId = 1; + int64_t sendingCheckpointId = 123; + + STaskTriggerSendInfo triggerInfo = {.sendTs = taosGetTimestampMs(), .recved = false, .nodeId = downstreamNodeId }; + taosArrayPush(pTask->chkInfo.pActiveInfo->pDispatchTriggerList, &triggerInfo); + + pTask->chkInfo.pActiveInfo->dispatchTrigger = true; + SArray* array1 = NULL; + code = chkptTriggerRecvMonitorHelper(pTask, NULL, &array1); + + EXPECT_EQ(code, TSDB_CODE_SUCCESS); +} From b89532c83c42417696d4bf77b05165d63a0bff0e Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao@163.com> Date: Thu, 19 Dec 2024 19:24:41 +0800 Subject: [PATCH 04/10] add ut --- source/libs/stream/test/streamCheckPointTest.cpp | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/source/libs/stream/test/streamCheckPointTest.cpp b/source/libs/stream/test/streamCheckPointTest.cpp index cad02a514e..0e1d73fef8 100644 --- a/source/libs/stream/test/streamCheckPointTest.cpp +++ b/source/libs/stream/test/streamCheckPointTest.cpp @@ -198,6 +198,11 @@ TEST(ChkptTriggerRecvMonitorHelperTest, chkptTriggerRecvMonitorHelper) { pTask->chkInfo.pActiveInfo->dispatchTrigger = true; SArray* array1 = NULL; code = chkptTriggerRecvMonitorHelper(pTask, NULL, &array1); - EXPECT_EQ(code, TSDB_CODE_SUCCESS); + + pTask->pMeta->fatalInfo.code = TSDB_CODE_SUCCESS; + streamSetFatalError(pTask->pMeta, code, __func__, __LINE__); + + pTask->pMeta->fatalInfo.code = TSDB_CODE_FAILED; + streamSetFatalError(pTask->pMeta, code, __func__, __LINE__); } From e7a6bc195a8d95745f4518493736d84ee449decb Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao@163.com> Date: Fri, 20 Dec 2024 08:38:57 +0800 Subject: [PATCH 05/10] adj ci --- tests/script/tsim/stream/streamInterpPrimaryKey2.sim | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/script/tsim/stream/streamInterpPrimaryKey2.sim b/tests/script/tsim/stream/streamInterpPrimaryKey2.sim index f06e1ecd03..0574a1ceec 100644 --- a/tests/script/tsim/stream/streamInterpPrimaryKey2.sim +++ b/tests/script/tsim/stream/streamInterpPrimaryKey2.sim @@ -4,6 +4,8 @@ system sh/exec.sh -n dnode1 -s start sleep 50 sql connect +sql alter local 'streamCoverage' '1'; + print step1 print =============== create database sql create database test vgroups 1; From 5b1368db2ff93ae2354eb8a56bae4f55556d31d9 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao@163.com> Date: Fri, 20 Dec 2024 08:46:57 +0800 Subject: [PATCH 06/10] adj ci --- source/libs/stream/test/streamCheckPointTest.cpp | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/source/libs/stream/test/streamCheckPointTest.cpp b/source/libs/stream/test/streamCheckPointTest.cpp index 0e1d73fef8..8b15b33b2f 100644 --- a/source/libs/stream/test/streamCheckPointTest.cpp +++ b/source/libs/stream/test/streamCheckPointTest.cpp @@ -153,8 +153,9 @@ TEST(StreamTaskAlreadySendTriggerTest, AlreadySendTrigger) { int32_t downstreamNodeId = 1; int64_t sendingCheckpointId = 123; + TSKEY ts = taosGetTimestampMs(); - STaskTriggerSendInfo triggerInfo = {.sendTs = taosGetTimestampMs(), .recved = false, .nodeId = downstreamNodeId }; + STaskTriggerSendInfo triggerInfo = {.sendTs = ts, .recved = false, .nodeId = downstreamNodeId }; taosArrayPush(pTask->chkInfo.pActiveInfo->pDispatchTriggerList, &triggerInfo); pTask->chkInfo.pActiveInfo->dispatchTrigger = true; @@ -191,8 +192,9 @@ TEST(ChkptTriggerRecvMonitorHelperTest, chkptTriggerRecvMonitorHelper) { int32_t downstreamNodeId = 1; int64_t sendingCheckpointId = 123; + TSKEY ts = taosGetTimestampMs(); - STaskTriggerSendInfo triggerInfo = {.sendTs = taosGetTimestampMs(), .recved = false, .nodeId = downstreamNodeId }; + STaskTriggerSendInfo triggerInfo = {.sendTs = ts, .recved = false, .nodeId = downstreamNodeId }; taosArrayPush(pTask->chkInfo.pActiveInfo->pDispatchTriggerList, &triggerInfo); pTask->chkInfo.pActiveInfo->dispatchTrigger = true; From 979a669ecd8d2e216b51a3797723af29a566f726 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao@163.com> Date: Fri, 20 Dec 2024 09:37:14 +0800 Subject: [PATCH 07/10] try fix window com error --- source/libs/stream/inc/streamInt.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index 41ac0117f3..39a3804c59 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -114,7 +114,7 @@ typedef struct { int32_t sendCompleted; } STaskCheckpointReadyInfo; -typedef struct { +typedef struct STaskTriggerSendInfo { int64_t sendTs; int64_t recvTs; bool recved; From 9866b9437d8b16b0de671afba3ef10b613e62bc8 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao@163.com> Date: Fri, 20 Dec 2024 11:27:29 +0800 Subject: [PATCH 08/10] try fix window com error --- source/libs/stream/inc/streamInt.h | 2 +- source/libs/stream/test/streamCheckPointTest.cpp | 12 ++++++++++-- 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index 39a3804c59..41ac0117f3 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -114,7 +114,7 @@ typedef struct { int32_t sendCompleted; } STaskCheckpointReadyInfo; -typedef struct STaskTriggerSendInfo { +typedef struct { int64_t sendTs; int64_t recvTs; bool recved; diff --git a/source/libs/stream/test/streamCheckPointTest.cpp b/source/libs/stream/test/streamCheckPointTest.cpp index 8b15b33b2f..781a205dc7 100644 --- a/source/libs/stream/test/streamCheckPointTest.cpp +++ b/source/libs/stream/test/streamCheckPointTest.cpp @@ -155,7 +155,11 @@ TEST(StreamTaskAlreadySendTriggerTest, AlreadySendTrigger) { int64_t sendingCheckpointId = 123; TSKEY ts = taosGetTimestampMs(); - STaskTriggerSendInfo triggerInfo = {.sendTs = ts, .recved = false, .nodeId = downstreamNodeId }; + STaskTriggerSendInfo triggerInfo; + triggerInfo.sendTs = ts; + triggerInfo.recved = false; + triggerInfo.nodeId = downstreamNodeId; + taosArrayPush(pTask->chkInfo.pActiveInfo->pDispatchTriggerList, &triggerInfo); pTask->chkInfo.pActiveInfo->dispatchTrigger = true; @@ -194,7 +198,11 @@ TEST(ChkptTriggerRecvMonitorHelperTest, chkptTriggerRecvMonitorHelper) { int64_t sendingCheckpointId = 123; TSKEY ts = taosGetTimestampMs(); - STaskTriggerSendInfo triggerInfo = {.sendTs = ts, .recved = false, .nodeId = downstreamNodeId }; + STaskTriggerSendInfo triggerInfo; + triggerInfo.sendTs = ts; + triggerInfo.recved = false; + triggerInfo.nodeId = downstreamNodeId; + taosArrayPush(pTask->chkInfo.pActiveInfo->pDispatchTriggerList, &triggerInfo); pTask->chkInfo.pActiveInfo->dispatchTrigger = true; From 52127109478e93b95cc20d7cc0d51849146023be Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao@163.com> Date: Fri, 20 Dec 2024 13:33:32 +0800 Subject: [PATCH 09/10] try fix window com error --- .../libs/stream/test/streamCheckPointTest.cpp | 29 ++++++++++++++----- 1 file changed, 22 insertions(+), 7 deletions(-) diff --git a/source/libs/stream/test/streamCheckPointTest.cpp b/source/libs/stream/test/streamCheckPointTest.cpp index 781a205dc7..218dfdb090 100644 --- a/source/libs/stream/test/streamCheckPointTest.cpp +++ b/source/libs/stream/test/streamCheckPointTest.cpp @@ -5,7 +5,7 @@ TEST(streamCheckpointTest, StreamTaskProcessCheckpointTriggerRsp) { SStreamTask* pTask = NULL; int64_t uid = 1111111111111111; - SArray* array = taosArrayInit(4, sizeof(SStreamTask)); + SArray* array = taosArrayInit(4, POINTER_BYTES); int32_t code = tNewStreamTask(uid, TASK_LEVEL__SINK, NULL, false, 0, 0, array, false, 1, &pTask); ASSERT_EQ(code, TSDB_CODE_SUCCESS); @@ -35,12 +35,13 @@ TEST(streamCheckpointTest, StreamTaskProcessCheckpointTriggerRsp) { ASSERT_EQ(code, TSDB_CODE_SUCCESS); tFreeStreamTask(pTask); + taosArrayDestroy(array); } TEST(streamCheckpointTest, StreamTaskSetFailedCheckpointId) { SStreamTask* pTask = NULL; int64_t uid = 1111111111111111; - SArray* array = taosArrayInit(4, sizeof(SStreamTask)); + SArray* array = taosArrayInit(4, POINTER_BYTES); int32_t code = tNewStreamTask(uid, TASK_LEVEL__SINK, NULL, false, 0, 0, array, false, 1, &pTask); ASSERT_EQ(code, TSDB_CODE_SUCCESS); @@ -62,12 +63,14 @@ TEST(streamCheckpointTest, StreamTaskSetFailedCheckpointId) { streamTaskSetFailedCheckpointId(pTask, pInfo->failedId - 1); ASSERT_EQ(pInfo->failedId, failedCheckpointId); tFreeStreamTask(pTask); + taosArrayDestroy(array); } TEST(UploadCheckpointDataTest, UploadSuccess) { + streamMetaInit(); SStreamTask* pTask = NULL; int64_t uid = 1111111111111111; - SArray* array = taosArrayInit(4, sizeof(SStreamTask)); + SArray* array = taosArrayInit(4, POINTER_BYTES); int32_t code = tNewStreamTask(uid, TASK_LEVEL__SINK, NULL, false, 0, 0, array, false, 1, &pTask); ASSERT_EQ(code, TSDB_CODE_SUCCESS); @@ -96,12 +99,16 @@ TEST(UploadCheckpointDataTest, UploadSuccess) { int32_t result = uploadCheckpointData(pTask, checkpointId, dbRefId, type); EXPECT_EQ(result, TSDB_CODE_SUCCESS) << "uploadCheckpointData should return 0 on success"; + tFreeStreamTask(pTask); + taosRemoveDir(path); + streamStateClose(pState, true); + taosArrayDestroy(array); } TEST(UploadCheckpointDataTest, UploadDisabled) { SStreamTask* pTask = NULL; int64_t uid = 2222222222222; - SArray* array = taosArrayInit(4, sizeof(SStreamTask)); + SArray* array = taosArrayInit(4, POINTER_BYTES); int32_t code = tNewStreamTask(uid, TASK_LEVEL__SINK, NULL, false, 0, 0, array, false, 1, &pTask); ASSERT_EQ(code, TSDB_CODE_SUCCESS); @@ -131,12 +138,16 @@ TEST(UploadCheckpointDataTest, UploadDisabled) { int32_t result = uploadCheckpointData(pTask, checkpointId, dbRefId, type); EXPECT_NE(result, TSDB_CODE_SUCCESS) << "uploadCheckpointData should return 0 when backup type is disabled"; + + streamStateClose(pState, true); + tFreeStreamTask(pTask); + taosArrayDestroy(array); } TEST(StreamTaskAlreadySendTriggerTest, AlreadySendTrigger) { SStreamTask* pTask = NULL; int64_t uid = 2222222222222; - SArray* array = taosArrayInit(4, sizeof(SStreamTask)); + SArray* array = taosArrayInit(4, POINTER_BYTES); int32_t code = tNewStreamTask(uid, TASK_LEVEL__SINK, NULL, false, 0, 0, array, false, 1, &pTask); ASSERT_EQ(code, TSDB_CODE_SUCCESS); @@ -167,13 +178,14 @@ TEST(StreamTaskAlreadySendTriggerTest, AlreadySendTrigger) { EXPECT_TRUE(result) << "The trigger message should have been sent to the downstream node"; - taosArrayDestroy(pTask->chkInfo.pActiveInfo->pDispatchTriggerList); + tFreeStreamTask(pTask); + taosArrayDestroy(array); } TEST(ChkptTriggerRecvMonitorHelperTest, chkptTriggerRecvMonitorHelper) { SStreamTask* pTask = NULL; int64_t uid = 2222222222222; - SArray* array = taosArrayInit(4, sizeof(SStreamTask)); + SArray* array = taosArrayInit(4, POINTER_BYTES); int32_t code = tNewStreamTask(uid, TASK_LEVEL__SINK, NULL, false, 0, 0, array, false, 1, &pTask); ASSERT_EQ(code, TSDB_CODE_SUCCESS); @@ -215,4 +227,7 @@ TEST(ChkptTriggerRecvMonitorHelperTest, chkptTriggerRecvMonitorHelper) { pTask->pMeta->fatalInfo.code = TSDB_CODE_FAILED; streamSetFatalError(pTask->pMeta, code, __func__, __LINE__); + tFreeStreamTask(pTask); + taosArrayDestroy(array); + taosArrayDestroy(array1); } From 89dc02012b14df20b71ffa0bc5115cbf9a9ed7c1 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao@163.com> Date: Fri, 20 Dec 2024 14:23:04 +0800 Subject: [PATCH 10/10] try fix window com error --- .../libs/stream/test/streamCheckPointTest.cpp | 37 +++++++++++++++++++ 1 file changed, 37 insertions(+) diff --git a/source/libs/stream/test/streamCheckPointTest.cpp b/source/libs/stream/test/streamCheckPointTest.cpp index 218dfdb090..80dd3ec142 100644 --- a/source/libs/stream/test/streamCheckPointTest.cpp +++ b/source/libs/stream/test/streamCheckPointTest.cpp @@ -2,6 +2,31 @@ #include "tstream.h" #include "streamInt.h" +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wwrite-strings" +#pragma GCC diagnostic ignored "-Wunused-function" +#pragma GCC diagnostic ignored "-Wunused-variable" +#pragma GCC diagnostic ignored "-Wsign-compare" +#pragma GCC diagnostic ignored "-Wsign-compare" +#pragma GCC diagnostic ignored "-Wformat" +#pragma GCC diagnostic ignored "-Wint-to-pointer-cast" +#pragma GCC diagnostic ignored "-Wpointer-arith" + +void initTaskLock(SStreamTask* pTask) { + TdThreadMutexAttr attr = {0}; + int32_t code = taosThreadMutexAttrInit(&attr); + ASSERT_EQ(code, TSDB_CODE_SUCCESS); + + code = taosThreadMutexAttrSetType(&attr, PTHREAD_MUTEX_RECURSIVE); + ASSERT_EQ(code, TSDB_CODE_SUCCESS); + + code = taosThreadMutexInit(&pTask->lock, &attr); + ASSERT_EQ(code, TSDB_CODE_SUCCESS); + + code = taosThreadMutexAttrDestroy(&attr); + ASSERT_EQ(code, TSDB_CODE_SUCCESS); +} + TEST(streamCheckpointTest, StreamTaskProcessCheckpointTriggerRsp) { SStreamTask* pTask = NULL; int64_t uid = 1111111111111111; @@ -10,6 +35,8 @@ TEST(streamCheckpointTest, StreamTaskProcessCheckpointTriggerRsp) { false, 1, &pTask); ASSERT_EQ(code, TSDB_CODE_SUCCESS); + initTaskLock(pTask); + code = streamTaskCreateActiveChkptInfo(&pTask->chkInfo.pActiveInfo); ASSERT_EQ(code, TSDB_CODE_SUCCESS); @@ -46,6 +73,8 @@ TEST(streamCheckpointTest, StreamTaskSetFailedCheckpointId) { false, 1, &pTask); ASSERT_EQ(code, TSDB_CODE_SUCCESS); + initTaskLock(pTask); + code = streamTaskCreateActiveChkptInfo(&pTask->chkInfo.pActiveInfo); ASSERT_EQ(code, TSDB_CODE_SUCCESS); @@ -75,6 +104,8 @@ TEST(UploadCheckpointDataTest, UploadSuccess) { false, 1, &pTask); ASSERT_EQ(code, TSDB_CODE_SUCCESS); + initTaskLock(pTask); + code = streamTaskCreateActiveChkptInfo(&pTask->chkInfo.pActiveInfo); ASSERT_EQ(code, TSDB_CODE_SUCCESS); @@ -113,6 +144,8 @@ TEST(UploadCheckpointDataTest, UploadDisabled) { false, 1, &pTask); ASSERT_EQ(code, TSDB_CODE_SUCCESS); + initTaskLock(pTask); + code = streamTaskCreateActiveChkptInfo(&pTask->chkInfo.pActiveInfo); ASSERT_EQ(code, TSDB_CODE_SUCCESS); @@ -152,6 +185,8 @@ TEST(StreamTaskAlreadySendTriggerTest, AlreadySendTrigger) { false, 1, &pTask); ASSERT_EQ(code, TSDB_CODE_SUCCESS); + initTaskLock(pTask); + code = streamTaskCreateActiveChkptInfo(&pTask->chkInfo.pActiveInfo); ASSERT_EQ(code, TSDB_CODE_SUCCESS); @@ -190,6 +225,8 @@ TEST(ChkptTriggerRecvMonitorHelperTest, chkptTriggerRecvMonitorHelper) { false, 1, &pTask); ASSERT_EQ(code, TSDB_CODE_SUCCESS); + initTaskLock(pTask); + const char *path = "/tmp/backend5/stream"; code = streamMetaOpen((path), NULL, NULL, NULL, 0, 0, NULL, &pTask->pMeta); ASSERT_EQ(code, TSDB_CODE_SUCCESS);