add self check

This commit is contained in:
Yihao Deng 2024-06-28 02:58:30 +00:00
parent de77ce6480
commit 51e4abe256
1 changed files with 130 additions and 49 deletions

View File

@ -1354,37 +1354,45 @@ int32_t chkpLoadExtraInfo(char* pChkpIdDir, int64_t* chkpId, int64_t* processId)
return -1;
}
char* pDst = taosMemoryCalloc(1, len + 64);
int32_t cap = len + 64;
char* pDst = taosMemoryCalloc(1, cap);
if (pDst == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
stError("failed to alloc memory to load extra info, dir:%s", pChkpIdDir);
goto _EXIT;
}
nBytes = snprintf(pDst, len + 64, "%s%sinfo", pChkpIdDir, TD_DIRSEP);
if (nBytes != strlen(pDst)) {
code = -1;
nBytes = snprintf(pDst, cap, "%s%sinfo", pChkpIdDir, TD_DIRSEP);
if (nBytes <= 0 || nBytes >= cap) {
terrno = TSDB_CODE_OUT_OF_RANGE;
stError("failed to build dst to load extra info, dir:%s", pChkpIdDir);
goto _EXIT;
}
pFile = taosOpenFile(pDst, TD_FILE_READ);
if (pFile == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno);
stError("failed to open file to load extra info, file:%s", pDst);
if (errno == ENOENT) {
// compatible with previous version
*processId = -1;
code = 0;
goto _EXIT;
} else {
terrno = TAOS_SYSTEM_ERROR(errno);
stError("failed to open file to load extra info, file:%s", pDst);
}
goto _EXIT;
}
if (taosReadFile(pFile, buf, sizeof(buf)) <= 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
stError("failed to read file to load extra info, file:%s, reason:%s", pDst, tstrerror(terrno));
code = -1;
goto _EXIT;
}
if (sscanf(buf, "%" PRId64 " %" PRId64 "", chkpId, processId) < 2) {
terrno = TSDB_CODE_INVALID_PARA;
stError("failed to read file content to load extra info, file:%s, reason:%s", pDst, tstrerror(terrno));
goto _EXIT;
}
code = 0;
_EXIT:
@ -1406,16 +1414,16 @@ int32_t chkpAddExtraInfo(char* pChkpIdDir, int64_t chkpId, int64_t processId) {
stError("failed to add extra info, dir:%s, reason:%s", pChkpIdDir, tstrerror(terrno));
return -1;
}
char* pDst = taosMemoryCalloc(1, len + 64);
int32_t cap = len + 64;
char* pDst = taosMemoryCalloc(1, cap);
if (pDst == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
stError("failed to alloc memory to add extra info, dir:%s", pChkpIdDir);
goto _EXIT;
}
nBytes = snprintf(pDst, len + 64, "%s%sinfo", pChkpIdDir, TD_DIRSEP);
if (nBytes != strlen(pDst)) {
nBytes = snprintf(pDst, cap, "%s%sinfo", pChkpIdDir, TD_DIRSEP);
if (nBytes <= 0 || nBytes >= cap) {
stError("failed to build dst to add extra info, dir:%s", pChkpIdDir);
goto _EXIT;
}
@ -1428,8 +1436,8 @@ int32_t chkpAddExtraInfo(char* pChkpIdDir, int64_t chkpId, int64_t processId) {
}
nBytes = snprintf(buf, sizeof(buf), "%" PRId64 " %" PRId64 "", chkpId, processId);
if (nBytes != strlen(buf)) {
code = -1;
if (nBytes <= 0 || nBytes >= sizeof(buf)) {
terrno = TSDB_CODE_OUT_OF_RANGE;
stError("failed to build content to add extra info, dir:%s", pChkpIdDir);
goto _EXIT;
}
@ -2475,7 +2483,7 @@ int32_t taskDbGenChkpUploadData__rsync(STaskDbWrapper* pDb, int64_t chkpId, char
nBytes =
snprintf(buf, cap, "%s%s%s%s%s%" PRId64 "", pDb->path, TD_DIRSEP, "checkpoints", TD_DIRSEP, "checkpoint", chkpId);
if (nBytes != strlen(buf)) {
if (nBytes <= 0 || nBytes >= cap) {
terrno = TSDB_CODE_OUT_OF_RANGE;
return -1;
}
@ -4311,6 +4319,7 @@ void dbChkpDebugInfo(SDbChkp* pDb) {
}
}
int32_t dbChkpGetDelta(SDbChkp* p, int64_t chkpId, SArray* list) {
int32_t nBytes;
taosThreadRwlockWrlock(&p->rwLock);
p->preCkptId = p->curChkpId;
@ -4368,6 +4377,11 @@ int32_t dbChkpGetDelta(SDbChkp* p, int64_t chkpId, SArray* list) {
char* name = taosHashGetKey(pIter, &len);
if (name != NULL && !isBkdDataMeta(name, len)) {
char* fname = taosMemoryCalloc(1, len + 1);
if (fname == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
taosThreadRwlockUnlock(&p->rwLock);
return -1;
}
strncpy(fname, name, len);
taosArrayPush(p->pAdd, &fname);
}
@ -4496,30 +4510,32 @@ int32_t dbChkpInit(SDbChkp* p) {
}
#endif
int32_t dbChkpDumpTo(SDbChkp* p, char* dname, SArray* list) {
static char* chkpMeta = "META";
int32_t code = -1;
int32_t cap = p->len + 128;
taosThreadRwlockRdlock(&p->rwLock);
int32_t code = -1;
int32_t len = p->len + 128;
char* srcBuf = taosMemoryCalloc(1, len);
char* dstBuf = taosMemoryCalloc(1, len);
char* srcBuf = taosMemoryCalloc(1, cap);
char* dstBuf = taosMemoryCalloc(1, cap);
char* srcDir = taosMemoryCalloc(1, len);
char* dstDir = taosMemoryCalloc(1, len);
char* srcDir = taosMemoryCalloc(1, cap);
char* dstDir = taosMemoryCalloc(1, cap);
if (srcBuf == NULL || dstBuf == NULL || srcDir == NULL || dstDir == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _ERROR;
}
int nBytes = snprintf(srcDir, len, "%s%s%s%s%s%" PRId64 "", p->path, TD_DIRSEP, "checkpoints", TD_DIRSEP,
int nBytes = snprintf(srcDir, cap, "%s%s%s%s%s%" PRId64 "", p->path, TD_DIRSEP, "checkpoints", TD_DIRSEP,
"checkpoint", p->curChkpId);
if (nBytes != strlen(srcBuf)) {
if (nBytes <= 0 || nBytes >= cap) {
terrno = TSDB_CODE_OUT_OF_RANGE;
goto _ERROR;
}
nBytes = snprintf(dstDir, len, "%s", dname);
if (nBytes != strlen(dstBuf)) {
nBytes = snprintf(dstDir, cap, "%s", dname);
if (nBytes <= 0 || nBytes >= cap) {
terrno = TSDB_CODE_OUT_OF_RANGE;
goto _ERROR;
}
@ -4536,12 +4552,21 @@ int32_t dbChkpDumpTo(SDbChkp* p, char* dname, SArray* list) {
// add file to $name dir
for (int i = 0; i < taosArrayGetSize(p->pAdd); i++) {
memset(srcBuf, 0, len);
memset(dstBuf, 0, len);
memset(srcBuf, 0, cap);
memset(dstBuf, 0, cap);
char* filename = taosArrayGetP(p->pAdd, i);
sprintf(srcBuf, "%s%s%s", srcDir, TD_DIRSEP, filename);
sprintf(dstBuf, "%s%s%s", dstDir, TD_DIRSEP, filename);
nBytes = snprintf(srcBuf, cap, "%s%s%s", srcDir, TD_DIRSEP, filename);
if (nBytes <= 0 || nBytes >= cap) {
terrno = TSDB_CODE_OUT_OF_RANGE;
goto _ERROR;
}
nBytes = snprintf(dstBuf, cap, "%s%s%s", dstDir, TD_DIRSEP, filename);
if (nBytes <= 0 || nBytes >= cap) {
terrno = TSDB_CODE_OUT_OF_RANGE;
goto _ERROR;
}
if (taosCopyFile(srcBuf, dstBuf) < 0) {
terrno = errno;
@ -4553,14 +4578,29 @@ int32_t dbChkpDumpTo(SDbChkp* p, char* dname, SArray* list) {
for (int i = 0; i < taosArrayGetSize(p->pDel); i++) {
char* filename = taosArrayGetP(p->pDel, i);
char* p = taosStrdup(filename);
if (p == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _ERROR;
}
taosArrayPush(list, &p);
}
// copy current file to dst dir
memset(srcBuf, 0, len);
memset(dstBuf, 0, len);
sprintf(srcBuf, "%s%s%s", srcDir, TD_DIRSEP, p->pCurrent);
sprintf(dstBuf, "%s%s%s_%" PRId64 "", dstDir, TD_DIRSEP, p->pCurrent, p->curChkpId);
memset(srcBuf, 0, cap);
memset(dstBuf, 0, cap);
nBytes = snprintf(srcBuf, cap, "%s%s%s", srcDir, TD_DIRSEP, p->pCurrent);
if (nBytes <= 0 || nBytes >= cap) {
terrno = TSDB_CODE_OUT_OF_RANGE;
goto _ERROR;
}
nBytes = snprintf(dstBuf, cap, "%s%s%s_%" PRId64 "", dstDir, TD_DIRSEP, p->pCurrent, p->curChkpId);
if (nBytes <= 0 || nBytes >= cap) {
terrno = TSDB_CODE_OUT_OF_RANGE;
goto _ERROR;
}
if (taosCopyFile(srcBuf, dstBuf) < 0) {
terrno = errno;
stError("failed to copy file from %s to %s, reason:%s", srcBuf, dstBuf, tstrerror(terrno));
@ -4568,23 +4608,37 @@ int32_t dbChkpDumpTo(SDbChkp* p, char* dname, SArray* list) {
}
// copy manifest file to dst dir
memset(srcBuf, 0, len);
memset(dstBuf, 0, len);
sprintf(srcBuf, "%s%s%s", srcDir, TD_DIRSEP, p->pManifest);
sprintf(dstBuf, "%s%s%s_%" PRId64 "", dstDir, TD_DIRSEP, p->pManifest, p->curChkpId);
memset(srcBuf, 0, cap);
memset(dstBuf, 0, cap);
nBytes = snprintf(srcBuf, cap, "%s%s%s", srcDir, TD_DIRSEP, p->pManifest);
if (nBytes <= 0 || nBytes >= cap) {
terrno = TSDB_CODE_OUT_OF_RANGE;
goto _ERROR;
}
nBytes = snprintf(dstBuf, cap, "%s%s%s_%" PRId64 "", dstDir, TD_DIRSEP, p->pManifest, p->curChkpId);
if (nBytes <= 0 || nBytes >= cap) {
terrno = TSDB_CODE_OUT_OF_RANGE;
goto _ERROR;
}
if (taosCopyFile(srcBuf, dstBuf) < 0) {
terrno = errno;
stError("failed to copy file from %s to %s, reason:%s", srcBuf, dstBuf, tstrerror(terrno));
goto _ERROR;
}
static char* chkpMeta = "META";
memset(dstBuf, 0, len);
sprintf(dstDir, "%s%s%s", dstDir, TD_DIRSEP, chkpMeta);
memset(dstBuf, 0, cap);
nBytes = snprintf(dstDir, cap, "%s%s%s", dstDir, TD_DIRSEP, chkpMeta);
if (nBytes <= 0 || nBytes >= cap) {
terrno = TSDB_CODE_OUT_OF_RANGE;
goto _ERROR;
}
TdFilePtr pFile = taosOpenFile(dstDir, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
if (pFile == NULL) {
terrno = errno;
terrno = TAOS_SYSTEM_ERROR(errno);
stError("chkp failed to create meta file: %s, reason:%s", dstDir, tstrerror(terrno));
goto _ERROR;
}
@ -4592,23 +4646,20 @@ int32_t dbChkpDumpTo(SDbChkp* p, char* dname, SArray* list) {
char content[256] = {0};
nBytes = snprintf(content, sizeof(content), META_ON_S3_FORMATE, p->pCurrent, p->curChkpId, p->pManifest, p->curChkpId,
"processVer", processId);
if (nBytes != strlen(content)) {
terrno = TSDB_CODE_INVALID_MSG;
if (nBytes <= 0 || nBytes >= sizeof(content)) {
terrno = TSDB_CODE_OUT_OF_RANGE;
stError("chkp failed to format meta file: %s, reason: invalid msg", dstDir);
taosCloseFile(&pFile);
code = -1;
goto _ERROR;
}
nBytes = taosWriteFile(pFile, content, strlen(content));
if (nBytes != strlen(content)) {
terrno = errno;
terrno = TAOS_SYSTEM_ERROR(errno);
stError("chkp failed to write meta file: %s,reason:%s", dstDir, tstrerror(terrno));
taosCloseFile(&pFile);
code = -1;
goto _ERROR;
}
taosCloseFile(&pFile);
// clear delta data buf
@ -4624,11 +4675,34 @@ _ERROR:
taosMemoryFree(dstDir);
return code;
}
SBkdMgt* bkdMgtCreate(char* path) {
SBkdMgt* p = taosMemoryCalloc(1, sizeof(SBkdMgt));
if (p == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
p->pDbChkpTbl = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
if (p->pDbChkpTbl == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
bkdMgtDestroy(p);
return NULL;
}
p->path = taosStrdup(path);
taosThreadRwlockInit(&p->rwLock, NULL);
if (p->path == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
bkdMgtDestroy(p);
return NULL;
}
if (taosThreadRwlockInit(&p->rwLock, NULL) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
bkdMgtDestroy(p);
return NULL;
}
return p;
}
@ -4656,14 +4730,21 @@ int32_t bkdMgtGetDelta(SBkdMgt* bm, char* taskId, int64_t chkpId, SArray* list,
SDbChkp* pChkp = ppChkp != NULL ? *ppChkp : NULL;
if (pChkp == NULL) {
char* path = taosMemoryCalloc(1, strlen(bm->path) + 64);
int32_t cap = strlen(bm->path) + 64;
char* path = taosMemoryCalloc(1, cap);
if (path == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
taosThreadRwlockUnlock(&bm->rwLock);
return -1;
}
sprintf(path, "%s%s%s", bm->path, TD_DIRSEP, taskId);
int32_t nBytes = snprintf(path, cap, "%s%s%s", bm->path, TD_DIRSEP, taskId);
if (nBytes <= 0 || nBytes >= cap) {
terrno = TSDB_CODE_OUT_OF_RANGE;
taosMemoryFree(path);
taosThreadRwlockUnlock(&bm->rwLock);
return -1;
}
SDbChkp* p = dbChkpCreate(path, chkpId);
if (p == NULL) {