From 9fd0bff673e948659bff7ff9377c571855239504 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 31 Aug 2023 14:57:26 +0800 Subject: [PATCH 1/3] fix invalid insert --- source/dnode/vnode/src/tq/tqStreamTaskSnap.c | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/source/dnode/vnode/src/tq/tqStreamTaskSnap.c b/source/dnode/vnode/src/tq/tqStreamTaskSnap.c index 20d25dbceb..797c968dcd 100644 --- a/source/dnode/vnode/src/tq/tqStreamTaskSnap.c +++ b/source/dnode/vnode/src/tq/tqStreamTaskSnap.c @@ -251,11 +251,13 @@ int32_t streamTaskSnapWrite(SStreamTaskWriter* pWriter, uint8_t* pData, uint32_t } tDecoderClear(&decoder); // tdbTbInsert(TTB *pTb, const void *pKey, int keyLen, const void *pVal, int valLen, TXN *pTxn) - if (tdbTbUpsert(pTq->pStreamMeta->pTaskDb, &pTask->id.taskId, sizeof(int32_t), - (uint8_t*)pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr), pWriter->txn) < 0) { + int64_t key[2] = {pTask->id.streamId, pTask->id.taskId}; + if (tdbTbUpsert(pTq->pStreamMeta->pTaskDb, key, sizeof(int64_t) << 1, (uint8_t*)pData + sizeof(SSnapDataHdr), + nData - sizeof(SSnapDataHdr), pWriter->txn) < 0) { taosMemoryFree(pTask); return -1; } + // mem leak or not ? taosMemoryFree(pTask); } else if (pHdr->type == SNAP_DATA_STREAM_TASK_CHECKPOINT) { // do nothing From 1833be028fa677daddf1d194e232753a4327e36a Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 31 Aug 2023 15:28:31 +0800 Subject: [PATCH 2/3] fix invalid insert --- include/libs/stream/tstream.h | 1 + source/dnode/vnode/src/tq/tqStreamTaskSnap.c | 14 ++++---------- source/libs/stream/src/streamTask.c | 14 +++++++++++++- 3 files changed, 18 insertions(+), 11 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index d880ae202e..f46a814c44 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -423,6 +423,7 @@ void tFreeStreamTask(SStreamTask* pTask); int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, int64_t ver); int32_t tDecodeStreamTaskChkInfo(SDecoder* pDecoder, SCheckpointInfo* pChkpInfo); +int32_t tDecodeStreamTaskId(SDecoder* pDecoder, SStreamTaskId* pTaskId); int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem); bool streamQueueIsFull(const STaosQueue* pQueue); diff --git a/source/dnode/vnode/src/tq/tqStreamTaskSnap.c b/source/dnode/vnode/src/tq/tqStreamTaskSnap.c index 797c968dcd..86433e4652 100644 --- a/source/dnode/vnode/src/tq/tqStreamTaskSnap.c +++ b/source/dnode/vnode/src/tq/tqStreamTaskSnap.c @@ -236,29 +236,23 @@ int32_t streamTaskSnapWrite(SStreamTaskWriter* pWriter, uint8_t* pData, uint32_t STqHandle handle; SSnapDataHdr* pHdr = (SSnapDataHdr*)pData; if (pHdr->type == SNAP_DATA_STREAM_TASK) { - SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask)); - if (pTask == NULL) { - return -1; - } + SStreamTaskId task = {0}; SDecoder decoder; tDecoderInit(&decoder, (uint8_t*)pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr)); - code = tDecodeStreamTask(&decoder, pTask); + + code = tDecodeStreamTaskId(&decoder, &task); if (code < 0) { tDecoderClear(&decoder); - taosMemoryFree(pTask); goto _err; } tDecoderClear(&decoder); // tdbTbInsert(TTB *pTb, const void *pKey, int keyLen, const void *pVal, int valLen, TXN *pTxn) - int64_t key[2] = {pTask->id.streamId, pTask->id.taskId}; + int64_t key[2] = {task.streamId, task.taskId}; if (tdbTbUpsert(pTq->pStreamMeta->pTaskDb, key, sizeof(int64_t) << 1, (uint8_t*)pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr), pWriter->txn) < 0) { - taosMemoryFree(pTask); return -1; } - // mem leak or not ? - taosMemoryFree(pTask); } else if (pHdr->type == SNAP_DATA_STREAM_TASK_CHECKPOINT) { // do nothing } diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 6eb09b95ec..247e79ee8c 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -251,6 +251,18 @@ int32_t tDecodeStreamTaskChkInfo(SDecoder* pDecoder, SCheckpointInfo* pChkpInfo) tEndDecode(pDecoder); return 0; } +int32_t tDecodeStreamTaskId(SDecoder* pDecoder, SStreamTaskId* pTaskId) { + int64_t ver; + if (tStartDecode(pDecoder) < 0) return -1; + if (tDecodeI64(pDecoder, &ver) < 0) return -1; + if (ver != SSTREAM_TASK_VER) return -1; + + if (tDecodeI64(pDecoder, &pTaskId->streamId) < 0) return -1; + if (tDecodeI32(pDecoder, &pTaskId->taskId) < 0) return -1; + + tEndDecode(pDecoder); + return 0; +} static void freeItem(void* p) { SStreamContinueExecInfo* pInfo = p; @@ -538,4 +550,4 @@ void streamTaskResetUpstreamStageInfo(SStreamTask* pTask) { } qDebug("s-task:%s reset all upstream tasks stage info", pTask->id.idStr); -} \ No newline at end of file +} From 9953a400ec6ecbdf26726ed42c97bc1787f145a2 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 31 Aug 2023 15:54:46 +0800 Subject: [PATCH 3/3] change chkp interval --- source/common/src/tglobal.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 02c56cc40f..ff9e922ee1 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -128,7 +128,7 @@ int32_t tsQueryPolicy = 1; int32_t tsQueryRspPolicy = 0; int64_t tsQueryMaxConcurrentTables = 200; // unit is TSDB_TABLE_NUM_UNIT bool tsEnableQueryHb = true; -bool tsEnableScience = false; // on taos-cli show float and doulbe with scientific notation if true +bool tsEnableScience = false; // on taos-cli show float and doulbe with scientific notation if true int32_t tsQuerySmaOptimize = 0; int32_t tsQueryRsmaTolerance = 1000; // the tolerance time (ms) to judge from which level to query rsma data. bool tsQueryPlannerTrace = false; @@ -240,11 +240,11 @@ int32_t tsTtlBatchDropNum = 10000; // number of tables dropped per batch // internal int32_t tsTransPullupInterval = 2; int32_t tsMqRebalanceInterval = 2; -int32_t tsStreamCheckpointTickInterval = 20; +int32_t tsStreamCheckpointTickInterval = 600; int32_t tsStreamNodeCheckInterval = 10; int32_t tsTtlUnit = 86400; int32_t tsTtlPushIntervalSec = 10; -int32_t tsTrimVDbIntervalSec = 60 * 60; // interval of trimming db in all vgroups +int32_t tsTrimVDbIntervalSec = 60 * 60; // interval of trimming db in all vgroups int32_t tsGrantHBInterval = 60; int32_t tsUptimeInterval = 300; // seconds char tsUdfdResFuncs[512] = ""; // udfd resident funcs that teardown when udfd exits