From 4c27af9b13837a5ee0cc5b66c9b21e0baa450df0 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 30 Apr 2024 13:25:39 +0800 Subject: [PATCH] refactor: do some internal refactor. --- source/common/src/systable.c | 1 + source/dnode/mnode/impl/src/mndStream.c | 7 +++++++ source/dnode/vnode/src/tq/tqSink.c | 8 ++++---- 3 files changed, 12 insertions(+), 4 deletions(-) diff --git a/source/common/src/systable.c b/source/common/src/systable.c index bf2f14339d..9de682dd3a 100644 --- a/source/common/src/systable.c +++ b/source/common/src/systable.c @@ -173,6 +173,7 @@ static const SSysDbTableSchema streamSchema[] = { {.name = "watermark", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false}, {.name = "trigger", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "sink_quota", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, + {.name = "checkpoint_interval", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "checkpoint_backup", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "history_scan_idle", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, }; diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index e8c22c8de2..b52f98adbf 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -1503,6 +1503,7 @@ static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataSetVal(pColInfo, numOfRows, (const char *)&trigger, false); + // sink_quota char sinkQuota[20 + VARSTR_HEADER_SIZE] = {0}; sinkQuota[0] = '0'; char dstStr[20] = {0}; @@ -1510,6 +1511,12 @@ static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataSetVal(pColInfo, numOfRows, (const char *)dstStr, false); + // checkpoint interval + char tmp[20 + VARSTR_HEADER_SIZE] = {0}; + STR_TO_VARSTR(tmp, "none") + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataSetVal(pColInfo, numOfRows, (const char *)tmp, false); + // checkpoint backup type char backup[20 + VARSTR_HEADER_SIZE] = {0}; STR_TO_VARSTR(backup, "none") diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index af8f567200..34171a5872 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -333,8 +333,8 @@ int32_t doMergeExistedRows(SSubmitTbData* pExisted, const SSubmitTbData* pNew, c } while (j < newLen && k < oldLen) { - SRow* pNewRow = TARRAY_GET_ELEM(pNew->aRowP, j); - SRow* pOldRow = TARRAY_GET_ELEM(pExisted->aRowP, k); + SRow* pNewRow = *(SRow**)TARRAY_GET_ELEM(pNew->aRowP, j); + SRow* pOldRow = *(SRow**)TARRAY_GET_ELEM(pExisted->aRowP, k); if (pNewRow->ts <= pOldRow->ts) { taosArrayPush(pFinal, &pNewRow); @@ -374,12 +374,12 @@ int32_t doMergeExistedRows(SSubmitTbData* pExisted, const SSubmitTbData* pNew, c } while (j < newLen) { - SRow* pRow = TARRAY_GET_ELEM(pNew->aRowP, j++); + SRow* pRow = *(SRow**)TARRAY_GET_ELEM(pNew->aRowP, j++); taosArrayPush(pFinal, &pRow); } while (k < oldLen) { - SRow* pRow = TARRAY_GET_ELEM(pExisted->aRowP, k++); + SRow* pRow = *(SRow**)TARRAY_GET_ELEM(pExisted->aRowP, k++); taosArrayPush(pFinal, &pRow); }