refactor: do some internal refactor.

This commit is contained in:
Haojun Liao 2024-04-28 10:05:22 +08:00
parent 8adaeb59a3
commit b2d8260f14
5 changed files with 13 additions and 7 deletions

View File

@ -547,7 +547,7 @@ typedef struct SStreamMeta {
SArray* chkpSaved; SArray* chkpSaved;
SArray* chkpInUse; SArray* chkpInUse;
SRWLatch chkpDirLock; SRWLatch chkpDirLock;
void* qHandle; void* qHandle; // todo remove it
void* bkdChkptMgt; void* bkdChkptMgt;
} SStreamMeta; } SStreamMeta;

View File

@ -173,6 +173,7 @@ static const SSysDbTableSchema streamSchema[] = {
{.name = "watermark", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false}, {.name = "watermark", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false},
{.name = "trigger", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "trigger", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "sink_quota", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "sink_quota", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "checkpoint_backup", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "history_scan_idle", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "history_scan_idle", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
}; };

View File

@ -1495,6 +1495,13 @@ static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)dstStr, false); colDataSetVal(pColInfo, numOfRows, (const char *)dstStr, false);
// checkpoint backup type
char backup[20 + VARSTR_HEADER_SIZE] = {0};
STR_TO_VARSTR(backup, "none")
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)backup, false);
// history scan idle
char scanHistoryIdle[20 + VARSTR_HEADER_SIZE] = {0}; char scanHistoryIdle[20 + VARSTR_HEADER_SIZE] = {0};
strcpy(scanHistoryIdle, "100a"); strcpy(scanHistoryIdle, "100a");

View File

@ -160,8 +160,6 @@ ECHECKPOINT_BACKUP_TYPE streamGetCheckpointBackupType();
int32_t streamTaskBackupCheckpoint(char* id, char* path); int32_t streamTaskBackupCheckpoint(char* id, char* path);
int32_t downloadCheckpoint(char* id, char* path); int32_t downloadCheckpoint(char* id, char* path);
int32_t deleteCheckpoint(char* id); int32_t deleteCheckpoint(char* id);
int32_t deleteCheckpointFile(char* id, char* name);
//int32_t downloadCheckpointDataByName(const char* id, char* fname, char* dstName);
int32_t streamTaskOnNormalTaskReady(SStreamTask* pTask); int32_t streamTaskOnNormalTaskReady(SStreamTask* pTask);
int32_t streamTaskOnScanhistoryTaskReady(SStreamTask* pTask); int32_t streamTaskOnScanhistoryTaskReady(SStreamTask* pTask);

View File

@ -27,6 +27,7 @@ typedef struct {
} SAsyncUploadArg; } SAsyncUploadArg;
static int32_t downloadCheckpointDataByName(const char* id, const char* fname, const char* dstName); static int32_t downloadCheckpointDataByName(const char* id, const char* fname, const char* dstName);
static int32_t deleteCheckpointFile(char* id, char* name);
int32_t tEncodeStreamCheckpointSourceReq(SEncoder* pEncoder, const SStreamCheckpointSourceReq* pReq) { int32_t tEncodeStreamCheckpointSourceReq(SEncoder* pEncoder, const SStreamCheckpointSourceReq* pReq) {
if (tStartEncode(pEncoder) < 0) return -1; if (tStartEncode(pEncoder) < 0) return -1;
@ -461,8 +462,7 @@ int32_t uploadCheckpointData(void* param) {
return code; return code;
} }
int32_t streamTaskUploadChkp(SStreamTask* pTask, int64_t chkpId, char* taskId) { int32_t streamTaskRemoteBackupCheckpoint(SStreamTask* pTask, int64_t chkpId, char* taskId) {
// async upload
ECHECKPOINT_BACKUP_TYPE type = streamGetCheckpointBackupType(); ECHECKPOINT_BACKUP_TYPE type = streamGetCheckpointBackupType();
if (type == DATA_UPLOAD_DISABLE) { if (type == DATA_UPLOAD_DISABLE) {
return 0; return 0;
@ -518,7 +518,7 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) {
if (code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
code = streamSaveTaskCheckpointInfo(pTask, ckId); code = streamSaveTaskCheckpointInfo(pTask, ckId);
if (code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
code = streamTaskUploadChkp(pTask, ckId, (char*)id); code = streamTaskRemoteBackupCheckpoint(pTask, ckId, (char*)id);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
stError("s-task:%s failed to upload checkpoint:%" PRId64 " failed", id, ckId); stError("s-task:%s failed to upload checkpoint:%" PRId64 " failed", id, ckId);
} }
@ -589,8 +589,8 @@ static int32_t uploadCheckpointToS3(char* id, char* path) {
stDebug("[s3] upload checkpoint:%s", filename); stDebug("[s3] upload checkpoint:%s", filename);
// break; // break;
} }
taosCloseDir(&pDir);
taosCloseDir(&pDir);
return 0; return 0;
} }