From 230486df13aa87a70bb235a20c5239ab8cbf1343 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 26 Apr 2024 17:02:27 +0800 Subject: [PATCH 1/3] fix(stream): update stream task meta info. --- source/common/src/systable.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/common/src/systable.c b/source/common/src/systable.c index 4dfe9d900f..14e8088dfe 100644 --- a/source/common/src/systable.c +++ b/source/common/src/systable.c @@ -191,8 +191,8 @@ static const SSysDbTableSchema streamTaskSchema[] = { {.name = "start_id", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false}, {.name = "start_ver", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false}, {.name = "checkpoint_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false}, - {.name = "checkpoint_id", .bytes = 25, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false}, - {.name = "checkpoint_version", .bytes = 25, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false}, + {.name = "checkpoint_id", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false}, + {.name = "checkpoint_version", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false}, {.name = "ds_err_info", .bytes = 25, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "history_task_id", .bytes = 16 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "history_task_status", .bytes = 12 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, From b1b324606606ddfa1934d2dc732659706c038735 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 26 Apr 2024 17:03:01 +0800 Subject: [PATCH 2/3] fix(stream): update the checkpoint executed transId range. --- source/dnode/vnode/src/tq/tq.c | 2 +- source/libs/stream/src/streamCheckStatus.c | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index cf985cd164..fe7daf2bf1 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1165,7 +1165,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) return TSDB_CODE_SUCCESS; } else { // checkpoint already finished, and not in checkpoint status - if (req.checkpointId == pTask->chkInfo.checkpointId) { + if (req.checkpointId <= pTask->chkInfo.checkpointId) { tqWarn("s-task:%s repeatly recv checkpoint-source msg checkpointId:%" PRId64 " transId:%d already handled, ignore and discard", pTask->id.idStr, req.checkpointId, req.transId); diff --git a/source/libs/stream/src/streamCheckStatus.c b/source/libs/stream/src/streamCheckStatus.c index 0f4662594f..d164d01934 100644 --- a/source/libs/stream/src/streamCheckStatus.c +++ b/source/libs/stream/src/streamCheckStatus.c @@ -338,7 +338,7 @@ int32_t streamTaskCompleteCheckRsp(STaskCheckInfo* pInfo, bool lock, const char* } if (!pInfo->inCheckProcess) { - stWarn("s-task:%s already not in-check-procedure", id); +// stWarn("s-task:%s already not in-check-procedure", id); } int64_t el = (pInfo->startTs != 0) ? (taosGetTimestampMs() - pInfo->startTs) : 0; From 6d06188b9fc28005d10511e5e388c26f6ea9d542 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 26 Apr 2024 17:06:15 +0800 Subject: [PATCH 3/3] fix(stream): fix memory leak. --- source/dnode/vnode/src/tq/tqSink.c | 1 + 1 file changed, 1 insertion(+) diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index f690b9b277..25c2f32307 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -485,6 +485,7 @@ SVCreateTbReq* buildAutoCreateTableReq(const char* stbFullName, int64_t suid, in int32_t doPutIntoCache(SSHashObj* pSinkTableMap, STableSinkInfo* pTableSinkInfo, uint64_t groupId, const char* id) { if (tSimpleHashGetSize(pSinkTableMap) > MAX_CACHE_TABLE_INFO_NUM) { + taosMemoryFreeClear(pTableSinkInfo); // too many items, failed to cache it return TSDB_CODE_FAILED; }