Merge branch '3.0' into enh/refactorBackend

This commit is contained in:
yihaoDeng 2023-11-10 12:07:33 +08:00
parent 54fe42d372
commit ad119ea4c0
1 changed files with 15 additions and 8 deletions

View File

@ -1691,7 +1691,7 @@ void taskDbDestroy(void* pDb) {
return;
}
int32_t taskDbGenChkpUplaodPath__rsync(STaskDbWrapper* pDb, int64_t chkpId, char** path) {
int32_t taskDbGenChkpUploadData__rsync(STaskDbWrapper* pDb, int64_t chkpId, char** path) {
int64_t st = taosGetTimestampMs();
int32_t code = -1;
int64_t refId = pDb->refId;
@ -1718,7 +1718,7 @@ int32_t taskDbGenChkpUplaodPath__rsync(STaskDbWrapper* pDb, int64_t chkpId, char
return code;
}
int32_t taskDbGenChkpUplaodPath__s3(STaskDbWrapper* pDb, void* bkdChkpMgt, int64_t chkpId, char** path) {
int32_t taskDbGenChkpUploadData__s3(STaskDbWrapper* pDb, void* bkdChkpMgt, int64_t chkpId, char** path) {
SBkdMgt* p = (SBkdMgt*)bkdChkpMgt;
char* temp = taosMemoryCalloc(1, strlen(pDb->path));
@ -1727,6 +1727,8 @@ int32_t taskDbGenChkpUplaodPath__s3(STaskDbWrapper* pDb, void* bkdChkpMgt, int64
if (taosDirExist(temp)) {
taosRemoveDir(temp);
taosMkDir(temp);
} else {
taosMkDir(temp);
}
bkdMgtGetDelta(p, pDb->idstr, chkpId, NULL, temp);
@ -1739,9 +1741,9 @@ int32_t taskDbGenChkpUploadPath(void* arg, void* mgt, int64_t chkpId, int8_t typ
UPLOAD_TYPE utype = type;
if (utype == UPLOAD_RSYNC) {
return taskDbGenChkpUplaodPath__rsync(pDb, chkpId, path);
return taskDbGenChkpUploadData__rsync(pDb, chkpId, path);
} else if (utype == UPLOAD_S3) {
return taskDbGenChkpUplaodPath__s3(pDb, mgt, chkpId, path);
return taskDbGenChkpUploadData__s3(pDb, mgt, chkpId, path);
}
return -1;
}
@ -3399,7 +3401,7 @@ int32_t dbChkpGetDelta(SDbChkp* p, int64_t chkpId, SArray* list) {
int32_t sstLen = strlen(pSST);
memset(p->buf, 0, p->len);
sprintf(p->buf, "%s%scheckpoint%" PRId64 "", p->path, TD_DIRSEP, chkpId);
sprintf(p->buf, "%s%s%s%scheckpoint%" PRId64 "", p->path, TD_DIRSEP, "checkpoints", TD_DIRSEP, chkpId);
taosArrayClearP(p->pAdd, taosMemoryFree);
taosArrayClearP(p->pDel, taosMemoryFree);
@ -3527,8 +3529,8 @@ int32_t dbChkpDumpTo(SDbChkp* p, char* dname) {
char* srcDir = taosMemoryCalloc(1, len);
char* dstDir = taosMemoryCalloc(1, len);
sprintf(srcDir, "%s%s%s%" PRId64 "", p->path, TD_DIRSEP, "checkpoint", p->curChkpId);
sprintf(dstDir, "%s%s%s", p->path, TD_DIRSEP, dname);
sprintf(srcDir, "%s%s%s%s%s%" PRId64 "", p->path, TD_DIRSEP, "checkpoints", TD_DIRSEP, "checkpoint", p->curChkpId);
sprintf(dstDir, "%s", dname);
if (!taosDirExist(srcDir)) {
stError("failed to dump srcDir %s, reason: not exist such dir", srcDir);
@ -3602,6 +3604,7 @@ _ERROR:
SBkdMgt* bkdMgtCreate(char* path) {
SBkdMgt* p = taosMemoryCalloc(1, sizeof(SBkdMgt));
p->pDbChkpTbl = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
p->path = taosStrdup(path);
taosThreadRwlockInit(&p->rwLock, NULL);
return p;
}
@ -3624,7 +3627,8 @@ int32_t bkdMgtGetDelta(SBkdMgt* bm, char* taskId, int64_t chkpId, SArray* list,
int32_t code = 0;
taosThreadRwlockWrlock(&bm->rwLock);
SDbChkp* pChkp = taosHashGet(bm->pDbChkpTbl, taskId, strlen(taskId));
SDbChkp** ppChkp = taosHashGet(bm->pDbChkpTbl, taskId, strlen(taskId));
SDbChkp* pChkp = NULL;
if (pChkp == NULL) {
char* taskPath = taosMemoryCalloc(1, strlen(bm->path) + 64);
@ -3639,9 +3643,12 @@ int32_t bkdMgtGetDelta(SBkdMgt* bm, char* taskId, int64_t chkpId, SArray* list,
code = dbChkpDumpTo(pChkp, dname);
taosThreadRwlockUnlock(&bm->rwLock);
return code;
} else {
pChkp = *ppChkp;
}
code = dbChkpGetDelta(pChkp, chkpId, list);
code = dbChkpDumpTo(pChkp, dname);
taosThreadRwlockUnlock(&bm->rwLock);
return code;
}