add self check

This commit is contained in:
Yihao Deng 2024-06-27 09:51:49 +00:00
parent 1004ac6924
commit de77ce6480
4 changed files with 182 additions and 85 deletions

BIN
out Executable file

Binary file not shown.

View File

@ -19,6 +19,8 @@
#include "tcommon.h"
#include "tref.h"
#define META_ON_S3_FORMATE "%s_%" PRId64 "\n%s_%" PRId64 "\n%s_%" PRId64 ""
typedef struct SCompactFilteFactory {
void* status;
} SCompactFilteFactory;
@ -233,15 +235,28 @@ int32_t rebuildDirFromCheckpoint(const char* path, int64_t chkpId, char** dst) {
return 0;
}
int32_t remoteChkp_readMetaData(char* path, SArray* list) {
int32_t cap = strlen(path);
char* metaPath = taosMemoryCalloc(1, cap + 32);
typedef struct {
char pCurrName[24];
int64_t currChkptId;
char pManifestName[24];
int64_t manifestChkptId;
char processName[24];
int64_t processId;
} SSChkpMetaOnS3;
int32_t remoteChkp_readMetaData(char* path, SSChkpMetaOnS3** pMeta) {
int32_t cap = strlen(path) + 32;
char* metaPath = taosMemoryCalloc(1, cap);
if (metaPath == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
if (sprintf(metaPath, "%s%s%s", path, TD_DIRSEP, "META") >= (cap + 32)) {
int32_t n = sprintf(metaPath, "%s%s%s", path, TD_DIRSEP, "META");
if (n <= 0 || n >= (cap - 1)) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
taosMemoryFree(metaPath);
return -1;
@ -254,23 +269,23 @@ int32_t remoteChkp_readMetaData(char* path, SArray* list) {
return -1;
}
char buf[128] = {0};
char buf[256] = {0};
if (taosReadFile(pFile, buf, sizeof(buf)) <= 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
taosMemoryFree(metaPath);
taosCloseFile(&pFile);
return -1;
}
int32_t len = strlen(buf);
for (int i = 0; i < len; i++) {
if (buf[i] == '\n') {
char* item = taosMemoryCalloc(1, i + 1);
memcpy(item, buf, i);
taosArrayPush(list, &item);
item = taosMemoryCalloc(1, len - i);
memcpy(item, buf + i + 1, len - i - 1);
taosArrayPush(list, &item);
}
SSChkpMetaOnS3* p = taosMemoryCalloc(1, sizeof(SSChkpMetaOnS3));
n = sscanf(buf, META_ON_S3_FORMATE, p->pCurrName, &p->currChkptId, p->pManifestName, &p->manifestChkptId,
p->processName, &p->processId);
if (n != 6) {
terrno = TSDB_CODE_INVALID_MSG;
taosMemoryFree(p);
taosMemoryFree(metaPath);
taosCloseFile(&pFile);
return -1;
}
taosCloseFile(&pFile);
@ -291,7 +306,7 @@ int32_t remoteChkp_validMetaFile(char* name, char* prename, int64_t chkpId) {
}
return valid;
}
int32_t remoteChkp_validAndCvtMeta(char* path, SArray* list, int64_t chkpId) {
int32_t remoteChkp_validAndCvtMeta(char* path, SSChkpMetaOnS3* pMeta, int64_t chkpId) {
int32_t complete = 1;
int32_t len = strlen(path) + 32;
char* src = taosMemoryCalloc(1, len);
@ -301,33 +316,38 @@ int32_t remoteChkp_validAndCvtMeta(char* path, SArray* list, int64_t chkpId) {
return -1;
}
if (pMeta->currChkptId != chkpId || pMeta->manifestChkptId != chkpId) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
int8_t count = 0;
for (int i = 0; i < taosArrayGetSize(list); i++) {
char* p = taosArrayGetP(list, i);
sprintf(src, "%s%s%s", path, TD_DIRSEP, p);
// for (int i = 0; i < taosArrayGetSize(list); i++) {
// char* p = taosArrayGetP(list, i);
// sprintf(src, "%s%s%s", path, TD_DIRSEP, p);
// check file exist
if (taosStatFile(src, NULL, NULL, NULL) != 0) {
complete = 0;
break;
}
// // check file exist
// if (taosStatFile(src, NULL, NULL, NULL) != 0) {
// complete = 0;
// break;
// }
// check file name
char temp[64] = {0};
if (remoteChkp_validMetaFile(p, temp, chkpId)) {
count++;
}
// // check file name
// char temp[64] = {0};
// if (remoteChkp_validMetaFile(p, temp, chkpId)) {
// count++;
// }
// rename file
sprintf(dst, "%s%s%s", path, TD_DIRSEP, temp);
taosRenameFile(src, dst);
// // rename file
// sprintf(dst, "%s%s%s", path, TD_DIRSEP, temp);
// taosRenameFile(src, dst);
memset(src, 0, len);
memset(dst, 0, len);
}
if (count != taosArrayGetSize(list)) {
complete = 0;
}
// memset(src, 0, len);
// memset(dst, 0, len);
// }
// if (count != taosArrayGetSize(list)) {
// complete = 0;
// }
taosMemoryFree(src);
taosMemoryFree(dst);
@ -385,12 +405,14 @@ int32_t rebuildFromRemoteChkp_s3(const char* key, char* chkpPath, int64_t chkpId
if (taosIsDir(tmp)) taosRemoveDir(tmp);
if (taosIsDir(defaultPath)) taosRenameFile(defaultPath, tmp);
SArray* list = taosArrayInit(2, sizeof(void*));
code = remoteChkp_readMetaData(chkpPath, list);
// SArray* list = taosArrayInit(2, sizeof(void*));
SSChkpMetaOnS3* pMeta;
code = remoteChkp_readMetaData(chkpPath, &pMeta);
if (code == 0) {
code = remoteChkp_validAndCvtMeta(chkpPath, list, chkpId);
code = remoteChkp_validAndCvtMeta(chkpPath, pMeta, chkpId);
}
taosArrayDestroyP(list, taosMemoryFree);
taosMemoryFree(pMeta);
// taosArrayDestroyP(list, taosMemoryFree);
if (code == 0) {
taosMkDir(defaultPath);
@ -1322,6 +1344,9 @@ int32_t chkpLoadExtraInfo(char* pChkpIdDir, int64_t* chkpId, int64_t* processId)
TdFilePtr pFile = NULL;
int32_t code = -1;
char buf[256] = {0};
int32_t nBytes = 0;
int32_t len = strlen(pChkpIdDir);
if (len == 0) {
terrno = TSDB_CODE_INVALID_PARA;
@ -1336,7 +1361,8 @@ int32_t chkpLoadExtraInfo(char* pChkpIdDir, int64_t* chkpId, int64_t* processId)
goto _EXIT;
}
if (sprintf(pDst, "%s%sinfo", pChkpIdDir, TD_DIRSEP) <= 0) {
nBytes = snprintf(pDst, len + 64, "%s%sinfo", pChkpIdDir, TD_DIRSEP);
if (nBytes != strlen(pDst)) {
code = -1;
stError("failed to build dst to load extra info, dir:%s", pChkpIdDir);
goto _EXIT;
@ -1349,7 +1375,6 @@ int32_t chkpLoadExtraInfo(char* pChkpIdDir, int64_t* chkpId, int64_t* processId)
goto _EXIT;
}
char buf[256] = {0};
if (taosReadFile(pFile, buf, sizeof(buf)) <= 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
stError("failed to read file to load extra info, file:%s, reason:%s", pDst, tstrerror(terrno));
@ -1368,8 +1393,12 @@ _EXIT:
return code;
}
int32_t chkpAddExtraInfo(char* pChkpIdDir, int64_t chkpId, int64_t processId) {
int32_t code = -1;
TdFilePtr pFile = NULL;
int32_t code = -1;
char buf[256] = {0};
int32_t nBytes = 0;
int32_t len = strlen(pChkpIdDir);
if (len == 0) {
@ -1385,7 +1414,8 @@ int32_t chkpAddExtraInfo(char* pChkpIdDir, int64_t chkpId, int64_t processId) {
goto _EXIT;
}
if (sprintf(pDst, "%s%sinfo", pChkpIdDir, TD_DIRSEP) < 0) {
nBytes = snprintf(pDst, len + 64, "%s%sinfo", pChkpIdDir, TD_DIRSEP);
if (nBytes != strlen(pDst)) {
stError("failed to build dst to add extra info, dir:%s", pChkpIdDir);
goto _EXIT;
}
@ -1397,15 +1427,14 @@ int32_t chkpAddExtraInfo(char* pChkpIdDir, int64_t chkpId, int64_t processId) {
goto _EXIT;
}
char buf[256] = {0};
int n = snprintf(buf, sizeof(buf), "%" PRId64 " %" PRId64 "", chkpId, processId);
if (n <= 0 || n >= sizeof(buf)) {
nBytes = snprintf(buf, sizeof(buf), "%" PRId64 " %" PRId64 "", chkpId, processId);
if (nBytes != strlen(buf)) {
code = -1;
stError("failed to build content to add extra info, dir:%s", pChkpIdDir);
goto _EXIT;
}
if (taosWriteFile(pFile, buf, strlen(buf)) <= 0) {
if (nBytes != taosWriteFile(pFile, buf, nBytes)) {
terrno = TAOS_SYSTEM_ERROR(errno);
stError("failed to write file to add extra info, file:%s, reason:%s", pDst, tstrerror(terrno));
goto _EXIT;
@ -2430,18 +2459,27 @@ void taskDbDestroy2(void* pDb) { taskDbDestroy(pDb, true); }
int32_t taskDbGenChkpUploadData__rsync(STaskDbWrapper* pDb, int64_t chkpId, char** path) {
int32_t code = -1;
int64_t refId = pDb->refId;
int32_t nBytes = 0;
if (taosAcquireRef(taskDbWrapperId, refId) == NULL) {
return -1;
}
char* buf = taosMemoryCalloc(1, strlen(pDb->path) + 128);
int32_t cap = strlen(pDb->path) + 128;
char* buf = taosMemoryCalloc(1, cap);
if (buf == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
sprintf(buf, "%s%s%s%s%s%" PRId64 "", pDb->path, TD_DIRSEP, "checkpoints", TD_DIRSEP, "checkpoint", chkpId);
nBytes =
snprintf(buf, cap, "%s%s%s%s%s%" PRId64 "", pDb->path, TD_DIRSEP, "checkpoints", TD_DIRSEP, "checkpoint", chkpId);
if (nBytes != strlen(buf)) {
terrno = TSDB_CODE_OUT_OF_RANGE;
return -1;
}
if (taosIsDir(buf)) {
code = 0;
*path = buf;
@ -4473,8 +4511,18 @@ int32_t dbChkpDumpTo(SDbChkp* p, char* dname, SArray* list) {
goto _ERROR;
}
sprintf(srcDir, "%s%s%s%s%s%" PRId64 "", p->path, TD_DIRSEP, "checkpoints", TD_DIRSEP, "checkpoint", p->curChkpId);
sprintf(dstDir, "%s", dname);
int nBytes = snprintf(srcDir, len, "%s%s%s%s%s%" PRId64 "", p->path, TD_DIRSEP, "checkpoints", TD_DIRSEP,
"checkpoint", p->curChkpId);
if (nBytes != strlen(srcBuf)) {
terrno = TSDB_CODE_OUT_OF_RANGE;
goto _ERROR;
}
nBytes = snprintf(dstDir, len, "%s", dname);
if (nBytes != strlen(dstBuf)) {
terrno = TSDB_CODE_OUT_OF_RANGE;
goto _ERROR;
}
if (!taosDirExist(srcDir)) {
stError("failed to dump srcDir %s, reason: not exist such dir", srcDir);
@ -4540,14 +4588,20 @@ int32_t dbChkpDumpTo(SDbChkp* p, char* dname, SArray* list) {
stError("chkp failed to create meta file: %s, reason:%s", dstDir, tstrerror(terrno));
goto _ERROR;
}
// META_ON_S3
// current_checkpointID
// manifest_checkpointID
// processVer_processID
char content[128] = {0};
snprintf(content, sizeof(content), "%s_%" PRId64 "\n%s_%" PRId64 "\n%s_%" PRId64 "", p->pCurrent, p->curChkpId,
p->pManifest, p->curChkpId, "processVer", processId);
if (taosWriteFile(pFile, content, strlen(content)) <= 0) {
char content[256] = {0};
nBytes = snprintf(content, sizeof(content), META_ON_S3_FORMATE, p->pCurrent, p->curChkpId, p->pManifest, p->curChkpId,
"processVer", processId);
if (nBytes != strlen(content)) {
terrno = TSDB_CODE_INVALID_MSG;
stError("chkp failed to format meta file: %s, reason: invalid msg", dstDir);
taosCloseFile(&pFile);
code = -1;
goto _ERROR;
}
nBytes = taosWriteFile(pFile, content, strlen(content));
if (nBytes != strlen(content)) {
terrno = errno;
stError("chkp failed to write meta file: %s,reason:%s", dstDir, tstrerror(terrno));
taosCloseFile(&pFile);
@ -4612,17 +4666,28 @@ int32_t bkdMgtGetDelta(SBkdMgt* bm, char* taskId, int64_t chkpId, SArray* list,
sprintf(path, "%s%s%s", bm->path, TD_DIRSEP, taskId);
SDbChkp* p = dbChkpCreate(path, chkpId);
taosHashPut(bm->pDbChkpTbl, taskId, strlen(taskId), &p, sizeof(void*));
if (p == NULL) {
taosMemoryFree(path);
taosThreadRwlockUnlock(&bm->rwLock);
return -1;
}
if (taosHashPut(bm->pDbChkpTbl, taskId, strlen(taskId), &p, sizeof(void*)) != 0) {
dbChkpDestroy(p);
taosMemoryFree(path);
taosThreadRwlockUnlock(&bm->rwLock);
return -1;
}
pChkp = p;
code = dbChkpDumpTo(pChkp, dname, list);
taosThreadRwlockUnlock(&bm->rwLock);
return code;
}
} else {
code = dbChkpGetDelta(pChkp, chkpId, NULL);
code = dbChkpGetDelta(pChkp, chkpId, NULL);
code = dbChkpDumpTo(pChkp, dname, list);
if (code == 0) code = dbChkpDumpTo(pChkp, dname, list);
}
taosThreadRwlockUnlock(&bm->rwLock);
return code;

View File

@ -527,27 +527,41 @@ void streamTaskSetFailedCheckpointId(SStreamTask* pTask) {
}
static int32_t getCheckpointDataMeta(const char* id, const char* path, SArray* list) {
char buf[128] = {0};
TdFilePtr pFile = NULL;
int32_t cap = strlen(path) + 32;
char buf[128] = {0};
int32_t code = 0;
char* file = taosMemoryCalloc(1, strlen(path) + 32);
sprintf(file, "%s%s%s", path, TD_DIRSEP, "META_TMP");
char* filePath = taosMemoryCalloc(1, cap);
if (filePath == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
int32_t code = downloadCheckpointDataByName(id, "META", file);
int32_t nBytes = snprintf(filePath, cap, "%s%s%s", path, TD_DIRSEP, "META_TMP");
if (nBytes != strlen(filePath)) {
taosMemoryFree(filePath);
terrno = TSDB_CODE_OUT_OF_RANGE;
return -1;
}
code = downloadCheckpointDataByName(id, "META", filePath);
if (code != 0) {
stDebug("%s chkp failed to download meta file:%s", id, file);
taosMemoryFree(file);
stDebug("%s chkp failed to download meta file:%s", id, filePath);
taosMemoryFree(filePath);
return code;
}
TdFilePtr pFile = taosOpenFile(file, TD_FILE_READ);
pFile = taosOpenFile(filePath, TD_FILE_READ);
if (pFile == NULL) {
stError("%s failed to open meta file:%s for checkpoint", id, file);
code = -1;
return code;
terrno = TAOS_SYSTEM_ERROR(errno);
stError("%s failed to open meta file:%s for checkpoint", id, filePath);
taosMemoryFree(filePath);
return -1;
}
if (taosReadFile(pFile, buf, sizeof(buf)) <= 0) {
stError("%s failed to read meta file:%s for checkpoint", id, file);
stError("%s failed to read meta file:%s for checkpoint", id, filePath);
code = -1;
} else {
int32_t len = strnlen(buf, tListLen(buf));
@ -565,27 +579,33 @@ static int32_t getCheckpointDataMeta(const char* id, const char* path, SArray* l
}
taosCloseFile(&pFile);
taosRemoveFile(file);
taosMemoryFree(file);
taosRemoveFile(filePath);
taosMemoryFree(filePath);
return code;
}
int32_t uploadCheckpointData(SStreamTask* pTask, int64_t checkpointId, int64_t dbRefId, ECHECKPOINT_BACKUP_TYPE type) {
char* path = NULL;
int32_t code = 0;
SArray* toDelFiles = taosArrayInit(4, POINTER_BYTES);
int64_t now = taosGetTimestampMs();
SStreamMeta* pMeta = pTask->pMeta;
const char* idStr = pTask->id.idStr;
int64_t now = taosGetTimestampMs();
SArray* toDelFiles = taosArrayInit(4, POINTER_BYTES);
if (toDelFiles == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
if ((code = taskDbGenChkpUploadData(pTask->pBackend, pMeta->bkdChkptMgt, checkpointId, type, &path, toDelFiles,
pTask->id.idStr)) != 0) {
stError("s-task:%s failed to gen upload checkpoint:%" PRId64, idStr, checkpointId);
stError("s-task:%s failed to gen upload checkpoint:%" PRId64 ", reason:%s", idStr, checkpointId, tstrerror(terrno));
}
if (type == DATA_UPLOAD_S3) {
if (code == TSDB_CODE_SUCCESS && (code = getCheckpointDataMeta(idStr, path, toDelFiles)) != 0) {
stError("s-task:%s failed to get checkpointData for checkpointId:%" PRId64 " meta", idStr, checkpointId);
stError("s-task:%s failed to get checkpointData for checkpointId:%" PRId64 ", reason:%s", idStr, checkpointId,
tstrerror(terrno));
}
}
@ -594,7 +614,7 @@ int32_t uploadCheckpointData(SStreamTask* pTask, int64_t checkpointId, int64_t d
if (code == TSDB_CODE_SUCCESS) {
stDebug("s-task:%s upload checkpointId:%" PRId64 " to remote succ", idStr, checkpointId);
} else {
stError("s-task:%s failed to upload checkpointId:%" PRId64 " data:%s", idStr, checkpointId, path);
stError("s-task:%s failed to upload checkpointId:%" PRId64 " path:%s,reason:%s", idStr, checkpointId, path);
}
}

12
t.c Normal file
View File

@ -0,0 +1,12 @@
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
int main() {
char *buf = calloc(1, 4);
int n = snprintf(buf, 4, "size");
printf("write size:%d \t buf:%s \t len:%d\n", n, buf, (int)(strlen(buf)));
buf[4] = 10;
return 1;
}