Merge branch '3.0' into enh/refactorBackend

This commit is contained in:
yihaoDeng 2023-11-10 18:37:25 +08:00
parent a3fdb6dd59
commit 1e8d46c405
2 changed files with 24 additions and 17 deletions

View File

@ -1722,7 +1722,7 @@ int32_t taskDbGenChkpUploadData__s3(STaskDbWrapper* pDb, void* bkdChkpMgt, int64
SBkdMgt* p = (SBkdMgt*)bkdChkpMgt;
char* temp = taosMemoryCalloc(1, strlen(pDb->path));
sprintf(temp, "%s%s%s", pDb->path, TD_DIRSEP, "tmp");
sprintf(temp, "%s%s%s%" PRId64 "", pDb->path, TD_DIRSEP, "tmp", chkpId);
if (taosDirExist(temp)) {
taosRemoveDir(temp);
@ -3363,16 +3363,16 @@ _err:
return code >= 0 ? 0 : -1;
}
int32_t isBkdDataMeta(char* name) {
int32_t isBkdDataMeta(char* name, int32_t len) {
const char* pCurrent = "CURRENT";
int32_t currLen = strlen(pCurrent);
const char* pManifest = "MANIFEST-";
int32_t maniLen = strlen(pManifest);
if (strlen(name) >= maniLen && strncmp(name, pManifest, maniLen) == 0) {
if (len >= maniLen && strncmp(name, pManifest, maniLen) == 0) {
return 1;
} else if (strlen(name) == currLen && strcmp(name, pCurrent) == 0) {
} else if (len == currLen && strcmp(name, pCurrent) == 0) {
return 1;
}
return 0;
@ -3383,9 +3383,10 @@ int32_t compareHashTableImpl(SHashObj* p1, SHashObj* p2, SArray* diff) {
void* pIter = taosHashIterate(p2, NULL);
while (pIter) {
char* name = taosHashGetKey(pIter, &len);
if (!isBkdDataMeta(name) && !taosHashGet(p1, name, len)) {
char* p = taosStrdup(name);
taosArrayPush(diff, &p);
if (!isBkdDataMeta(name, len) && !taosHashGet(p1, name, len)) {
char* fname = taosMemoryCalloc(1, len + 1);
strncpy(fname, name, len);
taosArrayPush(diff, &fname);
}
pIter = taosHashIterate(p2, pIter);
}
@ -3429,18 +3430,17 @@ int32_t dbChkpGetDelta(SDbChkp* p, int64_t chkpId, SArray* list) {
if (strlen(name) == currLen && strcmp(name, pCurrent) == 0) {
taosMemoryFreeClear(p->pCurrent);
p->pCurrent = taosStrdup(name);
taosHashPut(p->pSstTbl[1 - p->idx], name, strlen(name), &dummy, sizeof(dummy));
// taosHashPut(p->pSstTbl[1 - p->idx], name, strlen(name), &dummy, sizeof(dummy));
continue;
}
if (strlen(name) >= maniLen && strncmp(name, pManifest, maniLen) == 0) {
taosMemoryFreeClear(p->pManifest);
p->pManifest = taosStrdup(name);
taosHashPut(p->pSstTbl[1 - p->idx], name, strlen(name), &dummy, sizeof(dummy));
// taosHashPut(p->pSstTbl[1 - p->idx], name, strlen(name), &dummy, sizeof(dummy));
continue;
}
if (strlen(name) >= sstLen && strncmp(name + strlen(name) - 4, pSST, sstLen) == 0) {
// char* p = taosStrdup(name);
taosHashPut(p->pSstTbl[1 - p->idx], name, strlen(name), &dummy, sizeof(dummy));
continue;
}
@ -3448,18 +3448,23 @@ int32_t dbChkpGetDelta(SDbChkp* p, int64_t chkpId, SArray* list) {
void* pIter = taosHashIterate(p->pSstTbl[1 - p->idx], NULL);
while (pIter) {
char *name = taosHashGetKey(pIter, NULL);
stError("curr file list: %s", name);
size_t len = 0;
char* name = taosHashGetKey(pIter, &len);
char* buf = taosMemoryCalloc(1, len + 1);
strncpy(buf, name, len);
stError("curr file list: %s", buf);
pIter = taosHashIterate(p->pSstTbl[1 - p->idx], pIter);
}
if (p->init == 0) {
void* pIter = taosHashIterate(p->pSstTbl[1 - p->idx], NULL);
while (pIter) {
size_t len;
size_t len = 0;
char* name = taosHashGetKey(pIter, &len);
if (name != NULL && !isBkdDataMeta(name)) {
char* fname = taosStrdup(name);
if (name != NULL && !isBkdDataMeta(name, len)) {
char* fname = taosMemoryCalloc(1, len + 1);
strncpy(fname, name, len);
taosArrayPush(p->pAdd, &fname);
}
pIter = taosHashIterate(p->pSstTbl[1 - p->idx], pIter);
@ -3579,8 +3584,8 @@ int32_t dbChkpDumpTo(SDbChkp* p, char* dname) {
// add file to $name dir
for (int i = 0; i < taosArrayGetSize(p->pAdd); i++) {
memset(dstBuf, 0, len);
memset(srcBuf, 0, len);
memset(dstBuf, 0, len);
char* filename = taosArrayGetP(p->pAdd, i);
sprintf(srcBuf, "%s%s%s", srcDir, TD_DIRSEP, filename);

View File

@ -348,7 +348,7 @@ int32_t doUploadChkp(void* param) {
if (code == 0 && uploadCheckpoint(arg->taskId, path) != 0) {
stError("s-task:%s failed to upload checkpoint:%" PRId64, arg->pTask->id.idStr, arg->chkpId);
}
taosRemoveDir(path);
taosMemoryFree(path);
taosMemoryFree(arg->taskId);
taosMemoryFree(arg);
@ -435,6 +435,7 @@ static int uploadCheckpointToS3(char* id, char* path) {
if (pDir == NULL) return -1;
TdDirEntryPtr de = NULL;
s3Init();
while ((de = taosReadDir(pDir)) != NULL) {
char* name = taosGetDirEntryName(de);
if (strcmp(name, ".") == 0 || strcmp(name, "..") == 0 || taosDirEntryIsDir(de)) continue;
@ -454,6 +455,7 @@ static int uploadCheckpointToS3(char* id, char* path) {
return -1;
}
stDebug("[s3] upload checkpoint:%s", filename);
break;
}
taosCloseDir(&pDir);