diff --git a/include/common/cos.h b/include/common/cos.h index 21b645f604..c6b159c1da 100644 --- a/include/common/cos.h +++ b/include/common/cos.h @@ -43,6 +43,7 @@ int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t size, int32_t s3GetObjectsByPrefix(const char *prefix, const char *path); void s3EvictCache(const char *path, long object_size); long s3Size(const char *object_name); +int32_t s3GetObjectToFile(const char *object_name, char *fileName); #ifdef __cplusplus } diff --git a/source/libs/stream/inc/streamBackendRocksdb.h b/source/libs/stream/inc/streamBackendRocksdb.h index 92ec1899db..47404f311f 100644 --- a/source/libs/stream/inc/streamBackendRocksdb.h +++ b/source/libs/stream/inc/streamBackendRocksdb.h @@ -77,7 +77,7 @@ typedef struct { SArray* chkpInUse; int32_t chkpCap; TdThreadRwlock chkpDirLock; - int64_t dataWritten; + int64_t dataWritten; } STaskDbWrapper; @@ -255,5 +255,5 @@ int32_t bkdMgtGetDelta(SBkdMgt* bm, char* taskId, int64_t chkpId, SArray* list, int32_t bkdMgtDumpTo(SBkdMgt* bm, char* taskId, char* dname); void bkdMgtDestroy(SBkdMgt* bm); -int32_t taskDbGenChkpUploadData(void* arg, void* bkdMgt, int64_t chkpId, int8_t type, char** pathkj); +int32_t taskDbGenChkpUploadData(void* arg, void* bkdMgt, int64_t chkpId, int8_t type, char** path, SArray* list); #endif \ No newline at end of file diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index bbe7bcf65c..c299d0cfe1 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -163,6 +163,7 @@ int uploadCheckpoint(char* id, char* path); int downloadCheckpoint(char* id, char* path); int deleteCheckpoint(char* id); int deleteCheckpointFile(char* id, char* name); +int downloadCheckpointByName(char* id, char* fname, char* dstName); int32_t onNormalTaskReady(SStreamTask* pTask); int32_t onScanhistoryTaskReady(SStreamTask* pTask); diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 5c59621ca9..b156bce5dd 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -1719,7 +1719,8 @@ int32_t taskDbGenChkpUploadData__rsync(STaskDbWrapper* pDb, int64_t chkpId, char return code; } -int32_t taskDbGenChkpUploadData__s3(STaskDbWrapper* pDb, void* bkdChkpMgt, int64_t chkpId, char** path) { +int32_t taskDbGenChkpUploadData__s3(STaskDbWrapper* pDb, void* bkdChkpMgt, int64_t chkpId, char** path, SArray* list) { + int32_t code = 0; SBkdMgt* p = (SBkdMgt*)bkdChkpMgt; char* temp = taosMemoryCalloc(1, strlen(pDb->path)); @@ -1731,20 +1732,20 @@ int32_t taskDbGenChkpUploadData__s3(STaskDbWrapper* pDb, void* bkdChkpMgt, int64 } else { taosMkDir(temp); } - bkdMgtGetDelta(p, pDb->idstr, chkpId, NULL, temp); + code = bkdMgtGetDelta(p, pDb->idstr, chkpId, list, temp); *path = temp; - return 0; + return code; } -int32_t taskDbGenChkpUploadData(void* arg, void* mgt, int64_t chkpId, int8_t type, char** path) { +int32_t taskDbGenChkpUploadData(void* arg, void* mgt, int64_t chkpId, int8_t type, char** path, SArray* list) { STaskDbWrapper* pDb = arg; UPLOAD_TYPE utype = type; if (utype == UPLOAD_RSYNC) { return taskDbGenChkpUploadData__rsync(pDb, chkpId, path); } else if (utype == UPLOAD_S3) { - return taskDbGenChkpUploadData__s3(pDb, mgt, chkpId, path); + return taskDbGenChkpUploadData__s3(pDb, mgt, chkpId, path, list); } return -1; } @@ -3559,7 +3560,7 @@ SDbChkp* dbChkpCreate(char* path, int64_t initChkpId) { p->curChkpId = initChkpId; p->preCkptId = -1; p->pSST = taosArrayInit(64, sizeof(void*)); - p->path = taosStrdup(path); + p->path = path; p->len = strlen(path) + 128; p->buf = taosMemoryCalloc(1, p->len); @@ -3597,9 +3598,9 @@ int32_t dbChkpInit(SDbChkp* p) { if (p == NULL) return 0; return 0; } -int32_t dbChkpDumpTo(SDbChkp* p, char* dname) { +int32_t dbChkpDumpTo(SDbChkp* p, char* dname, SArray* list) { taosThreadRwlockRdlock(&p->rwLock); - int32_t code = 0; + int32_t code = -1; int32_t len = p->len + 128; char* srcBuf = taosMemoryCalloc(1, len); @@ -3613,26 +3614,9 @@ int32_t dbChkpDumpTo(SDbChkp* p, char* dname) { if (!taosDirExist(srcDir)) { stError("failed to dump srcDir %s, reason: not exist such dir", srcDir); - code = -1; goto _ERROR; } - // code = taosMkDir(dstDir); - // if (code != 0) { - // terrno = TAOS_SYSTEM_ERROR(errno); - // stError("failed to mkdir srcDir %s, reason: %s", dstDir, terrstr()); - // goto _ERROR; - // } - - // clear current file - memset(dstBuf, 0, len); - sprintf(dstBuf, "%s%s%s", dstDir, TD_DIRSEP, p->pCurrent); - taosRemoveFile(dstBuf); - - memset(dstBuf, 0, len); - sprintf(dstBuf, "%s%s%s", dstDir, TD_DIRSEP, p->pManifest); - taosRemoveFile(dstBuf); - // add file to $name dir for (int i = 0; i < taosArrayGetSize(p->pAdd); i++) { memset(srcBuf, 0, len); @@ -3644,39 +3628,59 @@ int32_t dbChkpDumpTo(SDbChkp* p, char* dname) { if (taosCopyFile(srcBuf, dstBuf) < 0) { stError("failed to copy file from %s to %s", srcBuf, dstBuf); + goto _ERROR; } } // del file in $name for (int i = 0; i < taosArrayGetSize(p->pDel); i++) { - memset(dstBuf, 0, len); - memset(srcBuf, 0, len); - char* filename = taosArrayGetP(p->pDel, i); - sprintf(dstBuf, "%s%s%s", dstDir, TD_DIRSEP, filename); - taosRemoveFile(dstBuf); + char* p = taosStrdup(filename); + taosArrayPush(list, &p); } // copy current file to dst dir memset(srcBuf, 0, len); memset(dstBuf, 0, len); sprintf(srcBuf, "%s%s%s", srcDir, TD_DIRSEP, p->pCurrent); - sprintf(dstBuf, "%s%s%s", dstDir, TD_DIRSEP, p->pCurrent); + sprintf(dstBuf, "%s%s%s_%" PRId64 "", dstDir, TD_DIRSEP, p->pCurrent, p->curChkpId); if (taosCopyFile(srcBuf, dstBuf) < 0) { stError("failed to copy file from %s to %s", srcBuf, dstBuf); + goto _ERROR; } // copy manifest file to dst dir memset(srcBuf, 0, len); memset(dstBuf, 0, len); sprintf(srcBuf, "%s%s%s", srcDir, TD_DIRSEP, p->pManifest); - sprintf(dstBuf, "%s%s%s", dstDir, TD_DIRSEP, p->pManifest); + sprintf(dstBuf, "%s%s%s_%" PRId64 "", dstDir, TD_DIRSEP, p->pManifest, p->curChkpId); if (taosCopyFile(srcBuf, dstBuf) < 0) { stError("failed to copy file from %s to %s", srcBuf, dstBuf); + goto _ERROR; } + static char* chkpMeta = "META"; + memset(dstBuf, 0, len); + sprintf(dstDir, "%s%s%s", dstDir, TD_DIRSEP, chkpMeta); + + TdFilePtr pFile = taosOpenFile(dstDir, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC); + if (pFile == NULL) { + stError("chkp failed to create meta file: %s", dstDir); + goto _ERROR; + } + char content[128] = {0}; + snprintf(content, sizeof(content), "%s_%" PRId64 "\n%s_%" PRId64 "", p->pCurrent, p->curChkpId, p->pManifest, + p->curChkpId); + if (taosWriteFile(pFile, content, strlen(content)) <= 0) { + stError("chkp failed to write meta file: %s", dstDir); + taosCloseFile(&pFile); + goto _ERROR; + } + taosCloseFile(&pFile); + // clear delta data buf taosArrayClearP(p->pAdd, taosMemoryFree); taosArrayClearP(p->pDel, taosMemoryFree); + code = 0; _ERROR: taosThreadRwlockUnlock(&p->rwLock); @@ -3718,22 +3722,22 @@ int32_t bkdMgtGetDelta(SBkdMgt* bm, char* taskId, int64_t chkpId, SArray* list, SDbChkp* pChkp = ppChkp != NULL ? *ppChkp : NULL; if (pChkp == NULL) { - char* taskPath = taosMemoryCalloc(1, strlen(bm->path) + 64); - sprintf(taskPath, "%s%s%s", bm->path, TD_DIRSEP, taskId); + char* path = taosMemoryCalloc(1, strlen(bm->path) + 64); + sprintf(path, "%s%s%s", bm->path, TD_DIRSEP, taskId); - SDbChkp* p = dbChkpCreate(taskPath, chkpId); + SDbChkp* p = dbChkpCreate(path, chkpId); taosHashPut(bm->pDbChkpTbl, taskId, strlen(taskId), &p, sizeof(void*)); - taosMemoryFree(taskPath); pChkp = p; - code = dbChkpDumpTo(pChkp, dname); + code = dbChkpDumpTo(pChkp, dname, list); taosThreadRwlockUnlock(&bm->rwLock); return code; } - code = dbChkpGetDelta(pChkp, chkpId, list); - code = dbChkpDumpTo(pChkp, dname); + code = dbChkpGetDelta(pChkp, chkpId, NULL); + code = dbChkpDumpTo(pChkp, dname, list); + taosThreadRwlockUnlock(&bm->rwLock); return code; } @@ -3763,7 +3767,7 @@ int32_t bkdMgtDumpTo(SBkdMgt* bm, char* taskId, char* dname) { taosThreadRwlockRdlock(&bm->rwLock); SDbChkp* p = taosHashGet(bm->pDbChkpTbl, taskId, strlen(taskId)); - code = dbChkpDumpTo(p, dname); + code = dbChkpDumpTo(p, dname, NULL); taosThreadRwlockUnlock(&bm->rwLock); return code; diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index f7ac9e61bc..b61256371b 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -338,18 +338,72 @@ void streamTaskSetFailedId(SStreamTask* pTask) { pTask->chkInfo.checkpointId = pTask->chkInfo.checkpointingId; } +int32_t getChkpMeta(char* id, char* path, SArray* list) { + char* file = taosMemoryCalloc(1, strlen(path) + 32); + sprintf(file, "%s%s%s", path, TD_DIRSEP, "META_TMP"); + int32_t code = downloadCheckpointByName(id, "META", file); + if (code != 0) { + stDebug("chkp failed to download meta file:%s", file); + taosMemoryFree(file); + return code; + } + TdFilePtr pFile = taosOpenFile(file, TD_FILE_READ); + char buf[128] = {0}; + if (taosReadFile(pFile, buf, sizeof(buf)) <= 0) { + stError("chkp failed to read meta file:%s", file); + code = -1; + } else { + int32_t len = strlen(buf); + for (int i = 0; i < len; i++) { + if (buf[i] == '\n') { + char* item = taosMemoryCalloc(1, i + 1); + memcpy(item, buf, i); + taosArrayPush(list, &item); + + item = taosMemoryCalloc(1, len - i); + memcpy(item, buf + i + 1, len - i - 1); + taosArrayPush(list, &item); + } + } + } + taosCloseFile(&pFile); + taosRemoveFile(file); + taosMemoryFree(file); + return code; +} int32_t doUploadChkp(void* param) { SAsyncUploadArg* arg = param; char* path = NULL; int32_t code = 0; + SArray* list = taosArrayInit(4, sizeof(void*)); if ((code = taskDbGenChkpUploadData(arg->pTask->pBackend, arg->pTask->pMeta->bkdChkptMgt, arg->chkpId, - (int8_t)(arg->type), &path)) != 0) { + (int8_t)(arg->type), &path, list)) != 0) { stError("s-task:%s failed to gen upload checkpoint:%" PRId64 "", arg->pTask->id.idStr, arg->chkpId); } - if (code == 0 && uploadCheckpoint(arg->taskId, path) != 0) { + + code = getChkpMeta(arg->taskId, path, list); + if (code != 0) { + code = 0; + } + + if (code == 0 && (code = uploadCheckpoint(arg->taskId, path)) != 0) { stError("s-task:%s failed to upload checkpoint:%" PRId64, arg->pTask->id.idStr, arg->chkpId); } + + if (code == 0) { + for (int i = 0; i < taosArrayGetSize(list); i++) { + char* p = taosArrayGetP(list, i); + code = deleteCheckpointFile(arg->taskId, p); + stDebug("try to del file: %s", p); + if (code != 0) { + break; + } + } + } + + taosArrayDestroyP(list, taosMemoryFree); + taosRemoveDir(path); taosMemoryFree(path); taosMemoryFree(arg->taskId); @@ -464,6 +518,17 @@ static int uploadCheckpointToS3(char* id, char* path) { return 0; } +static int downloadCheckpointByNameS3(char* id, char* fname, char* dstName) { + int code = 0; + char* buf = taosMemoryCalloc(1, strlen(id) + strlen(dstName) + 4); + sprintf(buf, "%s/%s", id, fname); + if (s3GetObjectToFile(buf, dstName) != 0) { + code = -1; + } + taosMemoryFree(buf); + return code; +} + UPLOAD_TYPE getUploadType() { if (strlen(tsSnodeAddress) != 0) { return UPLOAD_RSYNC; @@ -487,6 +552,20 @@ int uploadCheckpoint(char* id, char* path) { return 0; } +// fileName: CURRENT +int downloadCheckpointByName(char* id, char* fname, char* dstName) { + if (id == NULL || fname == NULL || strlen(id) == 0 || strlen(fname) == 0 || strlen(fname) >= PATH_MAX) { + stError("uploadCheckpointByName parameters invalid"); + return -1; + } + if (strlen(tsSnodeAddress) != 0) { + return 0; + } else if (tsS3StreamEnabled) { + return downloadCheckpointByNameS3(id, fname, dstName); + } + return 0; +} + int downloadCheckpoint(char* id, char* path) { if (id == NULL || path == NULL || strlen(id) == 0 || strlen(path) == 0 || strlen(path) >= PATH_MAX) { stError("downloadCheckpoint parameters invalid");