From b2d8260f14e5d7a50249b59f27d0ed83c4ae0882 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 28 Apr 2024 10:05:22 +0800 Subject: [PATCH] refactor: do some internal refactor. --- include/libs/stream/tstream.h | 2 +- source/common/src/systable.c | 1 + source/dnode/mnode/impl/src/mndStream.c | 7 +++++++ source/libs/stream/inc/streamInt.h | 2 -- source/libs/stream/src/streamCheckpoint.c | 8 ++++---- 5 files changed, 13 insertions(+), 7 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 3c74a9fd7b..119a77cbe8 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -547,7 +547,7 @@ typedef struct SStreamMeta { SArray* chkpSaved; SArray* chkpInUse; SRWLatch chkpDirLock; - void* qHandle; + void* qHandle; // todo remove it void* bkdChkptMgt; } SStreamMeta; diff --git a/source/common/src/systable.c b/source/common/src/systable.c index 14e8088dfe..7c868fcbe4 100644 --- a/source/common/src/systable.c +++ b/source/common/src/systable.c @@ -173,6 +173,7 @@ static const SSysDbTableSchema streamSchema[] = { {.name = "watermark", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false}, {.name = "trigger", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "sink_quota", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, + {.name = "checkpoint_backup", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "history_scan_idle", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, }; diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 844aae0f57..f6d86ad317 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -1495,6 +1495,13 @@ static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataSetVal(pColInfo, numOfRows, (const char *)dstStr, false); + // checkpoint backup type + char backup[20 + VARSTR_HEADER_SIZE] = {0}; + STR_TO_VARSTR(backup, "none") + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataSetVal(pColInfo, numOfRows, (const char *)backup, false); + + // history scan idle char scanHistoryIdle[20 + VARSTR_HEADER_SIZE] = {0}; strcpy(scanHistoryIdle, "100a"); diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index 45a75ea5e7..0ee31197dc 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -160,8 +160,6 @@ ECHECKPOINT_BACKUP_TYPE streamGetCheckpointBackupType(); int32_t streamTaskBackupCheckpoint(char* id, char* path); int32_t downloadCheckpoint(char* id, char* path); int32_t deleteCheckpoint(char* id); -int32_t deleteCheckpointFile(char* id, char* name); -//int32_t downloadCheckpointDataByName(const char* id, char* fname, char* dstName); int32_t streamTaskOnNormalTaskReady(SStreamTask* pTask); int32_t streamTaskOnScanhistoryTaskReady(SStreamTask* pTask); diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 8244df2995..3428fc36e1 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -27,6 +27,7 @@ typedef struct { } SAsyncUploadArg; static int32_t downloadCheckpointDataByName(const char* id, const char* fname, const char* dstName); +static int32_t deleteCheckpointFile(char* id, char* name); int32_t tEncodeStreamCheckpointSourceReq(SEncoder* pEncoder, const SStreamCheckpointSourceReq* pReq) { if (tStartEncode(pEncoder) < 0) return -1; @@ -461,8 +462,7 @@ int32_t uploadCheckpointData(void* param) { return code; } -int32_t streamTaskUploadChkp(SStreamTask* pTask, int64_t chkpId, char* taskId) { - // async upload +int32_t streamTaskRemoteBackupCheckpoint(SStreamTask* pTask, int64_t chkpId, char* taskId) { ECHECKPOINT_BACKUP_TYPE type = streamGetCheckpointBackupType(); if (type == DATA_UPLOAD_DISABLE) { return 0; @@ -518,7 +518,7 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) { if (code == TSDB_CODE_SUCCESS) { code = streamSaveTaskCheckpointInfo(pTask, ckId); if (code == TSDB_CODE_SUCCESS) { - code = streamTaskUploadChkp(pTask, ckId, (char*)id); + code = streamTaskRemoteBackupCheckpoint(pTask, ckId, (char*)id); if (code != TSDB_CODE_SUCCESS) { stError("s-task:%s failed to upload checkpoint:%" PRId64 " failed", id, ckId); } @@ -589,8 +589,8 @@ static int32_t uploadCheckpointToS3(char* id, char* path) { stDebug("[s3] upload checkpoint:%s", filename); // break; } - taosCloseDir(&pDir); + taosCloseDir(&pDir); return 0; }