rm dup data

This commit is contained in:
yihaoDeng 2023-11-14 17:43:13 +08:00
parent 68b6cc1d9d
commit 4476e46a5f
5 changed files with 129 additions and 44 deletions

View File

@ -43,6 +43,7 @@ int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t size,
int32_t s3GetObjectsByPrefix(const char *prefix, const char *path);
void s3EvictCache(const char *path, long object_size);
long s3Size(const char *object_name);
int32_t s3GetObjectToFile(const char *object_name, char *fileName);
#ifdef __cplusplus
}

View File

@ -77,7 +77,7 @@ typedef struct {
SArray* chkpInUse;
int32_t chkpCap;
TdThreadRwlock chkpDirLock;
int64_t dataWritten;
int64_t dataWritten;
} STaskDbWrapper;
@ -255,5 +255,5 @@ int32_t bkdMgtGetDelta(SBkdMgt* bm, char* taskId, int64_t chkpId, SArray* list,
int32_t bkdMgtDumpTo(SBkdMgt* bm, char* taskId, char* dname);
void bkdMgtDestroy(SBkdMgt* bm);
int32_t taskDbGenChkpUploadData(void* arg, void* bkdMgt, int64_t chkpId, int8_t type, char** pathkj);
int32_t taskDbGenChkpUploadData(void* arg, void* bkdMgt, int64_t chkpId, int8_t type, char** path, SArray* list);
#endif

View File

@ -163,6 +163,7 @@ int uploadCheckpoint(char* id, char* path);
int downloadCheckpoint(char* id, char* path);
int deleteCheckpoint(char* id);
int deleteCheckpointFile(char* id, char* name);
int downloadCheckpointByName(char* id, char* fname, char* dstName);
int32_t onNormalTaskReady(SStreamTask* pTask);
int32_t onScanhistoryTaskReady(SStreamTask* pTask);

View File

@ -1719,7 +1719,8 @@ int32_t taskDbGenChkpUploadData__rsync(STaskDbWrapper* pDb, int64_t chkpId, char
return code;
}
int32_t taskDbGenChkpUploadData__s3(STaskDbWrapper* pDb, void* bkdChkpMgt, int64_t chkpId, char** path) {
int32_t taskDbGenChkpUploadData__s3(STaskDbWrapper* pDb, void* bkdChkpMgt, int64_t chkpId, char** path, SArray* list) {
int32_t code = 0;
SBkdMgt* p = (SBkdMgt*)bkdChkpMgt;
char* temp = taosMemoryCalloc(1, strlen(pDb->path));
@ -1731,20 +1732,20 @@ int32_t taskDbGenChkpUploadData__s3(STaskDbWrapper* pDb, void* bkdChkpMgt, int64
} else {
taosMkDir(temp);
}
bkdMgtGetDelta(p, pDb->idstr, chkpId, NULL, temp);
code = bkdMgtGetDelta(p, pDb->idstr, chkpId, list, temp);
*path = temp;
return 0;
return code;
}
int32_t taskDbGenChkpUploadData(void* arg, void* mgt, int64_t chkpId, int8_t type, char** path) {
int32_t taskDbGenChkpUploadData(void* arg, void* mgt, int64_t chkpId, int8_t type, char** path, SArray* list) {
STaskDbWrapper* pDb = arg;
UPLOAD_TYPE utype = type;
if (utype == UPLOAD_RSYNC) {
return taskDbGenChkpUploadData__rsync(pDb, chkpId, path);
} else if (utype == UPLOAD_S3) {
return taskDbGenChkpUploadData__s3(pDb, mgt, chkpId, path);
return taskDbGenChkpUploadData__s3(pDb, mgt, chkpId, path, list);
}
return -1;
}
@ -3559,7 +3560,7 @@ SDbChkp* dbChkpCreate(char* path, int64_t initChkpId) {
p->curChkpId = initChkpId;
p->preCkptId = -1;
p->pSST = taosArrayInit(64, sizeof(void*));
p->path = taosStrdup(path);
p->path = path;
p->len = strlen(path) + 128;
p->buf = taosMemoryCalloc(1, p->len);
@ -3597,9 +3598,9 @@ int32_t dbChkpInit(SDbChkp* p) {
if (p == NULL) return 0;
return 0;
}
int32_t dbChkpDumpTo(SDbChkp* p, char* dname) {
int32_t dbChkpDumpTo(SDbChkp* p, char* dname, SArray* list) {
taosThreadRwlockRdlock(&p->rwLock);
int32_t code = 0;
int32_t code = -1;
int32_t len = p->len + 128;
char* srcBuf = taosMemoryCalloc(1, len);
@ -3613,26 +3614,9 @@ int32_t dbChkpDumpTo(SDbChkp* p, char* dname) {
if (!taosDirExist(srcDir)) {
stError("failed to dump srcDir %s, reason: not exist such dir", srcDir);
code = -1;
goto _ERROR;
}
// code = taosMkDir(dstDir);
// if (code != 0) {
// terrno = TAOS_SYSTEM_ERROR(errno);
// stError("failed to mkdir srcDir %s, reason: %s", dstDir, terrstr());
// goto _ERROR;
// }
// clear current file
memset(dstBuf, 0, len);
sprintf(dstBuf, "%s%s%s", dstDir, TD_DIRSEP, p->pCurrent);
taosRemoveFile(dstBuf);
memset(dstBuf, 0, len);
sprintf(dstBuf, "%s%s%s", dstDir, TD_DIRSEP, p->pManifest);
taosRemoveFile(dstBuf);
// add file to $name dir
for (int i = 0; i < taosArrayGetSize(p->pAdd); i++) {
memset(srcBuf, 0, len);
@ -3644,39 +3628,59 @@ int32_t dbChkpDumpTo(SDbChkp* p, char* dname) {
if (taosCopyFile(srcBuf, dstBuf) < 0) {
stError("failed to copy file from %s to %s", srcBuf, dstBuf);
goto _ERROR;
}
}
// del file in $name
for (int i = 0; i < taosArrayGetSize(p->pDel); i++) {
memset(dstBuf, 0, len);
memset(srcBuf, 0, len);
char* filename = taosArrayGetP(p->pDel, i);
sprintf(dstBuf, "%s%s%s", dstDir, TD_DIRSEP, filename);
taosRemoveFile(dstBuf);
char* p = taosStrdup(filename);
taosArrayPush(list, &p);
}
// copy current file to dst dir
memset(srcBuf, 0, len);
memset(dstBuf, 0, len);
sprintf(srcBuf, "%s%s%s", srcDir, TD_DIRSEP, p->pCurrent);
sprintf(dstBuf, "%s%s%s", dstDir, TD_DIRSEP, p->pCurrent);
sprintf(dstBuf, "%s%s%s_%" PRId64 "", dstDir, TD_DIRSEP, p->pCurrent, p->curChkpId);
if (taosCopyFile(srcBuf, dstBuf) < 0) {
stError("failed to copy file from %s to %s", srcBuf, dstBuf);
goto _ERROR;
}
// copy manifest file to dst dir
memset(srcBuf, 0, len);
memset(dstBuf, 0, len);
sprintf(srcBuf, "%s%s%s", srcDir, TD_DIRSEP, p->pManifest);
sprintf(dstBuf, "%s%s%s", dstDir, TD_DIRSEP, p->pManifest);
sprintf(dstBuf, "%s%s%s_%" PRId64 "", dstDir, TD_DIRSEP, p->pManifest, p->curChkpId);
if (taosCopyFile(srcBuf, dstBuf) < 0) {
stError("failed to copy file from %s to %s", srcBuf, dstBuf);
goto _ERROR;
}
static char* chkpMeta = "META";
memset(dstBuf, 0, len);
sprintf(dstDir, "%s%s%s", dstDir, TD_DIRSEP, chkpMeta);
TdFilePtr pFile = taosOpenFile(dstDir, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
if (pFile == NULL) {
stError("chkp failed to create meta file: %s", dstDir);
goto _ERROR;
}
char content[128] = {0};
snprintf(content, sizeof(content), "%s_%" PRId64 "\n%s_%" PRId64 "", p->pCurrent, p->curChkpId, p->pManifest,
p->curChkpId);
if (taosWriteFile(pFile, content, strlen(content)) <= 0) {
stError("chkp failed to write meta file: %s", dstDir);
taosCloseFile(&pFile);
goto _ERROR;
}
taosCloseFile(&pFile);
// clear delta data buf
taosArrayClearP(p->pAdd, taosMemoryFree);
taosArrayClearP(p->pDel, taosMemoryFree);
code = 0;
_ERROR:
taosThreadRwlockUnlock(&p->rwLock);
@ -3718,22 +3722,22 @@ int32_t bkdMgtGetDelta(SBkdMgt* bm, char* taskId, int64_t chkpId, SArray* list,
SDbChkp* pChkp = ppChkp != NULL ? *ppChkp : NULL;
if (pChkp == NULL) {
char* taskPath = taosMemoryCalloc(1, strlen(bm->path) + 64);
sprintf(taskPath, "%s%s%s", bm->path, TD_DIRSEP, taskId);
char* path = taosMemoryCalloc(1, strlen(bm->path) + 64);
sprintf(path, "%s%s%s", bm->path, TD_DIRSEP, taskId);
SDbChkp* p = dbChkpCreate(taskPath, chkpId);
SDbChkp* p = dbChkpCreate(path, chkpId);
taosHashPut(bm->pDbChkpTbl, taskId, strlen(taskId), &p, sizeof(void*));
taosMemoryFree(taskPath);
pChkp = p;
code = dbChkpDumpTo(pChkp, dname);
code = dbChkpDumpTo(pChkp, dname, list);
taosThreadRwlockUnlock(&bm->rwLock);
return code;
}
code = dbChkpGetDelta(pChkp, chkpId, list);
code = dbChkpDumpTo(pChkp, dname);
code = dbChkpGetDelta(pChkp, chkpId, NULL);
code = dbChkpDumpTo(pChkp, dname, list);
taosThreadRwlockUnlock(&bm->rwLock);
return code;
}
@ -3763,7 +3767,7 @@ int32_t bkdMgtDumpTo(SBkdMgt* bm, char* taskId, char* dname) {
taosThreadRwlockRdlock(&bm->rwLock);
SDbChkp* p = taosHashGet(bm->pDbChkpTbl, taskId, strlen(taskId));
code = dbChkpDumpTo(p, dname);
code = dbChkpDumpTo(p, dname, NULL);
taosThreadRwlockUnlock(&bm->rwLock);
return code;

View File

@ -338,18 +338,72 @@ void streamTaskSetFailedId(SStreamTask* pTask) {
pTask->chkInfo.checkpointId = pTask->chkInfo.checkpointingId;
}
int32_t getChkpMeta(char* id, char* path, SArray* list) {
char* file = taosMemoryCalloc(1, strlen(path) + 32);
sprintf(file, "%s%s%s", path, TD_DIRSEP, "META_TMP");
int32_t code = downloadCheckpointByName(id, "META", file);
if (code != 0) {
stDebug("chkp failed to download meta file:%s", file);
taosMemoryFree(file);
return code;
}
TdFilePtr pFile = taosOpenFile(file, TD_FILE_READ);
char buf[128] = {0};
if (taosReadFile(pFile, buf, sizeof(buf)) <= 0) {
stError("chkp failed to read meta file:%s", file);
code = -1;
} else {
int32_t len = strlen(buf);
for (int i = 0; i < len; i++) {
if (buf[i] == '\n') {
char* item = taosMemoryCalloc(1, i + 1);
memcpy(item, buf, i);
taosArrayPush(list, &item);
item = taosMemoryCalloc(1, len - i);
memcpy(item, buf + i + 1, len - i - 1);
taosArrayPush(list, &item);
}
}
}
taosCloseFile(&pFile);
taosRemoveFile(file);
taosMemoryFree(file);
return code;
}
int32_t doUploadChkp(void* param) {
SAsyncUploadArg* arg = param;
char* path = NULL;
int32_t code = 0;
SArray* list = taosArrayInit(4, sizeof(void*));
if ((code = taskDbGenChkpUploadData(arg->pTask->pBackend, arg->pTask->pMeta->bkdChkptMgt, arg->chkpId,
(int8_t)(arg->type), &path)) != 0) {
(int8_t)(arg->type), &path, list)) != 0) {
stError("s-task:%s failed to gen upload checkpoint:%" PRId64 "", arg->pTask->id.idStr, arg->chkpId);
}
if (code == 0 && uploadCheckpoint(arg->taskId, path) != 0) {
code = getChkpMeta(arg->taskId, path, list);
if (code != 0) {
code = 0;
}
if (code == 0 && (code = uploadCheckpoint(arg->taskId, path)) != 0) {
stError("s-task:%s failed to upload checkpoint:%" PRId64, arg->pTask->id.idStr, arg->chkpId);
}
if (code == 0) {
for (int i = 0; i < taosArrayGetSize(list); i++) {
char* p = taosArrayGetP(list, i);
code = deleteCheckpointFile(arg->taskId, p);
stDebug("try to del file: %s", p);
if (code != 0) {
break;
}
}
}
taosArrayDestroyP(list, taosMemoryFree);
taosRemoveDir(path);
taosMemoryFree(path);
taosMemoryFree(arg->taskId);
@ -464,6 +518,17 @@ static int uploadCheckpointToS3(char* id, char* path) {
return 0;
}
static int downloadCheckpointByNameS3(char* id, char* fname, char* dstName) {
int code = 0;
char* buf = taosMemoryCalloc(1, strlen(id) + strlen(dstName) + 4);
sprintf(buf, "%s/%s", id, fname);
if (s3GetObjectToFile(buf, dstName) != 0) {
code = -1;
}
taosMemoryFree(buf);
return code;
}
UPLOAD_TYPE getUploadType() {
if (strlen(tsSnodeAddress) != 0) {
return UPLOAD_RSYNC;
@ -487,6 +552,20 @@ int uploadCheckpoint(char* id, char* path) {
return 0;
}
// fileName: CURRENT
int downloadCheckpointByName(char* id, char* fname, char* dstName) {
if (id == NULL || fname == NULL || strlen(id) == 0 || strlen(fname) == 0 || strlen(fname) >= PATH_MAX) {
stError("uploadCheckpointByName parameters invalid");
return -1;
}
if (strlen(tsSnodeAddress) != 0) {
return 0;
} else if (tsS3StreamEnabled) {
return downloadCheckpointByNameS3(id, fname, dstName);
}
return 0;
}
int downloadCheckpoint(char* id, char* path) {
if (id == NULL || path == NULL || strlen(id) == 0 || strlen(path) == 0 || strlen(path) >= PATH_MAX) {
stError("downloadCheckpoint parameters invalid");