add self check info

This commit is contained in:
Yihao Deng 2024-06-25 12:04:10 +00:00
parent 33aef6ddc5
commit 49ba8132c0
2 changed files with 139 additions and 20 deletions

View File

@ -1178,6 +1178,7 @@ int32_t chkpPreBuildDir(char* path, int64_t chkpId, char** chkpDir, char** chkpI
}
int32_t taskDbBuildSnap(void* arg, SArray* pSnap) {
// vnode task->db
SStreamMeta* pMeta = arg;
taosThreadMutexLock(&pMeta->backendMutex);
@ -1186,27 +1187,44 @@ int32_t taskDbBuildSnap(void* arg, SArray* pSnap) {
while (pIter) {
STaskDbWrapper* pTaskDb = *(STaskDbWrapper**)pIter;
taskDbAddRef(pTaskDb);
int64_t chkpId = pTaskDb->chkpId;
taskDbRefChkp(pTaskDb, chkpId);
code = taskDbDoCheckpoint(pTaskDb, chkpId, 0);
if (code != 0) {
taskDbUnRefChkp(pTaskDb, chkpId);
void* p = taskDbAddRef(pTaskDb);
if (p == NULL) {
terrno = 0;
pIter = taosHashIterate(pMeta->pTaskDbUnique, pIter);
continue;
}
taskDbRemoveRef(pTaskDb);
// add chkpId to in-use-ckpkIdSet
taskDbRefChkp(pTaskDb, pTaskDb->chkpId);
code = taskDbDoCheckpoint(pTaskDb, pTaskDb->chkpId, ((SStreamTask*)pTaskDb->pTask)->chkInfo.processedVer);
if (code != 0) {
// remove chkpId from in-use-ckpkIdSet
taskDbUnRefChkp(pTaskDb, pTaskDb->chkpId);
taskDbRemoveRef(pTaskDb);
code = -1;
break;
}
SStreamTask* pTask = pTaskDb->pTask;
SStreamTaskSnap snap = {.streamId = pTask->id.streamId,
.taskId = pTask->id.taskId,
.chkpId = pTaskDb->chkpId,
.dbPrefixPath = taosStrdup(pTaskDb->path)};
if (snap.dbPrefixPath == NULL) {
// remove chkpid from chkp-in-use set
taskDbUnRefChkp(pTaskDb, pTaskDb->chkpId);
taskDbRemoveRef(pTaskDb);
terrno = TSDB_CODE_OUT_OF_MEMORY;
code = -1;
break;
}
taosArrayPush(pSnap, &snap);
pIter = taosHashIterate(pMeta->pTaskDbUnique, pIter);
}
taosThreadMutexUnlock(&pMeta->backendMutex);
return code;
}
int32_t taskDbDestroySnap(void* arg, SArray* pSnapInfo) {
@ -2172,23 +2190,35 @@ void taskDbDestroyChkpOpt(STaskDbWrapper* pTaskDb) {
int32_t taskDbBuildFullPath(char* path, char* key, char** dbFullPath, char** stateFullPath) {
int32_t code = 0;
char* statePath = taosMemoryCalloc(1, strlen(path) + 128);
if (statePath == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
char* statePath = taosMemoryCalloc(1, strlen(path) + 128);
sprintf(statePath, "%s%s%s", path, TD_DIRSEP, key);
if (!taosDirExist(statePath)) {
code = taosMulMkDir(statePath);
if (code != 0) {
stError("failed to create dir: %s, reason:%s", statePath, tstrerror(code));
terrno = errno;
stError("failed to create dir: %s, reason:%s", statePath, tstrerror(terrno));
taosMemoryFree(statePath);
return code;
}
}
char* dbPath = taosMemoryCalloc(1, strlen(statePath) + 128);
if (dbPath == NULL) {
taosMemoryFree(statePath);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
sprintf(dbPath, "%s%s%s", statePath, TD_DIRSEP, "state");
if (!taosDirExist(dbPath)) {
code = taosMulMkDir(dbPath);
if (code != 0) {
terrno = errno;
stError("failed to create dir: %s, reason:%s", dbPath, tstrerror(code));
taosMemoryFree(statePath);
taosMemoryFree(dbPath);
@ -2384,6 +2414,11 @@ int32_t taskDbGenChkpUploadData__rsync(STaskDbWrapper* pDb, int64_t chkpId, char
}
char* buf = taosMemoryCalloc(1, strlen(pDb->path) + 128);
if (buf == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
sprintf(buf, "%s%s%s%s%s%" PRId64 "", pDb->path, TD_DIRSEP, "checkpoints", TD_DIRSEP, "checkpoint", chkpId);
if (taosIsDir(buf)) {
code = 0;
@ -2402,6 +2437,11 @@ int32_t taskDbGenChkpUploadData__s3(STaskDbWrapper* pDb, void* bkdChkpMgt, int64
SBkdMgt* p = (SBkdMgt*)bkdChkpMgt;
char* temp = taosMemoryCalloc(1, strlen(pDb->path) + 32);
if (temp == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
sprintf(temp, "%s%s%s%" PRId64, pDb->path, TD_DIRSEP, "tmp", chkpId);
if (taosDirExist(temp)) {
@ -4239,14 +4279,12 @@ int32_t dbChkpGetDelta(SDbChkp* p, int64_t chkpId, SArray* list) {
if (strlen(name) == currLen && strcmp(name, pCurrent) == 0) {
taosMemoryFreeClear(p->pCurrent);
p->pCurrent = taosStrdup(name);
// taosHashPut(p->pSstTbl[1 - p->idx], name, strlen(name), &dummy, sizeof(dummy));
continue;
}
if (strlen(name) >= maniLen && strncmp(name, pManifest, maniLen) == 0) {
taosMemoryFreeClear(p->pManifest);
p->pManifest = taosStrdup(name);
// taosHashPut(p->pSstTbl[1 - p->idx], name, strlen(name), &dummy, sizeof(dummy));
continue;
}
if (strlen(name) >= sstLen && strncmp(name + strlen(name) - 4, pSST, sstLen) == 0) {
@ -4301,31 +4339,75 @@ int32_t dbChkpGetDelta(SDbChkp* p, int64_t chkpId, SArray* list) {
return 0;
}
void dbChkpDestroy(SDbChkp* pChkp);
SDbChkp* dbChkpCreate(char* path, int64_t initChkpId) {
SDbChkp* p = taosMemoryCalloc(1, sizeof(SDbChkp));
if (p == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _EXIT;
}
p->curChkpId = initChkpId;
p->preCkptId = -1;
p->pSST = taosArrayInit(64, sizeof(void*));
if (p->pSST == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
dbChkpDestroy(p);
return NULL;
}
p->path = path;
p->len = strlen(path) + 128;
p->buf = taosMemoryCalloc(1, p->len);
if (p->buf == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _EXIT;
}
p->idx = 0;
p->pSstTbl[0] = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
if (p->pSstTbl[0] == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _EXIT;
}
p->pSstTbl[1] = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
if (p->pSstTbl[1] == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _EXIT;
}
p->pAdd = taosArrayInit(64, sizeof(void*));
if (p->pAdd == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _EXIT;
}
p->pDel = taosArrayInit(64, sizeof(void*));
if (p->pDel == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _EXIT;
}
p->update = 0;
taosThreadRwlockInit(&p->rwLock, NULL);
SArray* list = NULL;
int32_t code = dbChkpGetDelta(p, initChkpId, list);
if (code != 0) {
goto _EXIT;
}
return p;
_EXIT:
dbChkpDestroy(p);
return NULL;
}
void dbChkpDestroy(SDbChkp* pChkp) {
if (pChkp == NULL) return;
taosMemoryFree(pChkp->buf);
taosMemoryFree(pChkp->path);
@ -4357,6 +4439,11 @@ int32_t dbChkpDumpTo(SDbChkp* p, char* dname, SArray* list) {
char* srcDir = taosMemoryCalloc(1, len);
char* dstDir = taosMemoryCalloc(1, len);
if (srcBuf == NULL || dstBuf == NULL || srcDir == NULL || dstDir == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _ERROR;
}
sprintf(srcDir, "%s%s%s%s%s%" PRId64 "", p->path, TD_DIRSEP, "checkpoints", TD_DIRSEP, "checkpoint", p->curChkpId);
sprintf(dstDir, "%s", dname);
@ -4375,7 +4462,8 @@ int32_t dbChkpDumpTo(SDbChkp* p, char* dname, SArray* list) {
sprintf(dstBuf, "%s%s%s", dstDir, TD_DIRSEP, filename);
if (taosCopyFile(srcBuf, dstBuf) < 0) {
stError("failed to copy file from %s to %s", srcBuf, dstBuf);
terrno = errno;
stError("failed to copy file from %s to %s, reason:%s", srcBuf, dstBuf, tstrerror(terrno));
goto _ERROR;
}
}
@ -4392,7 +4480,8 @@ int32_t dbChkpDumpTo(SDbChkp* p, char* dname, SArray* list) {
sprintf(srcBuf, "%s%s%s", srcDir, TD_DIRSEP, p->pCurrent);
sprintf(dstBuf, "%s%s%s_%" PRId64 "", dstDir, TD_DIRSEP, p->pCurrent, p->curChkpId);
if (taosCopyFile(srcBuf, dstBuf) < 0) {
stError("failed to copy file from %s to %s", srcBuf, dstBuf);
terrno = errno;
stError("failed to copy file from %s to %s, reason:%s", srcBuf, dstBuf, tstrerror(terrno));
goto _ERROR;
}
@ -4402,7 +4491,8 @@ int32_t dbChkpDumpTo(SDbChkp* p, char* dname, SArray* list) {
sprintf(srcBuf, "%s%s%s", srcDir, TD_DIRSEP, p->pManifest);
sprintf(dstBuf, "%s%s%s_%" PRId64 "", dstDir, TD_DIRSEP, p->pManifest, p->curChkpId);
if (taosCopyFile(srcBuf, dstBuf) < 0) {
stError("failed to copy file from %s to %s", srcBuf, dstBuf);
terrno = errno;
stError("failed to copy file from %s to %s, reason:%s", srcBuf, dstBuf, tstrerror(terrno));
goto _ERROR;
}
@ -4412,17 +4502,21 @@ int32_t dbChkpDumpTo(SDbChkp* p, char* dname, SArray* list) {
TdFilePtr pFile = taosOpenFile(dstDir, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
if (pFile == NULL) {
stError("chkp failed to create meta file: %s", dstDir);
terrno = errno;
stError("chkp failed to create meta file: %s, reason:%s", dstDir, tstrerror(terrno));
goto _ERROR;
}
char content[128] = {0};
snprintf(content, sizeof(content), "%s_%" PRId64 "\n%s_%" PRId64 "", p->pCurrent, p->curChkpId, p->pManifest,
p->curChkpId);
if (taosWriteFile(pFile, content, strlen(content)) <= 0) {
stError("chkp failed to write meta file: %s", dstDir);
terrno = errno;
stError("chkp failed to write meta file: %s,reason:%s", dstDir, tstrerror(terrno));
taosCloseFile(&pFile);
code = -1;
goto _ERROR;
}
taosCloseFile(&pFile);
// clear delta data buf
@ -4471,6 +4565,12 @@ int32_t bkdMgtGetDelta(SBkdMgt* bm, char* taskId, int64_t chkpId, SArray* list,
if (pChkp == NULL) {
char* path = taosMemoryCalloc(1, strlen(bm->path) + 64);
if (path == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
taosThreadRwlockUnlock(&bm->rwLock);
return -1;
}
sprintf(path, "%s%s%s", bm->path, TD_DIRSEP, taskId);
SDbChkp* p = dbChkpCreate(path, chkpId);

View File

@ -24,6 +24,7 @@ enum SBackendFileType {
ROCKSDB_SST_TYPE = 3,
ROCKSDB_CURRENT_TYPE = 4,
ROCKSDB_CHECKPOINT_META_TYPE = 5,
ROCKSDB_CHECKPOINT_SELFCHECK_TYPE = 6,
};
typedef struct SBackendFileItem {
@ -49,6 +50,7 @@ typedef struct SBackendSnapFiles2 {
char* pOptions;
SArray* pSst;
char* pCheckpointMeta;
char* pCheckpointSelfcheck;
char* path;
int64_t checkpointId;
@ -111,6 +113,7 @@ const char* ROCKSDB_MAINFEST = "MANIFEST";
const char* ROCKSDB_SST = "sst";
const char* ROCKSDB_CURRENT = "CURRENT";
const char* ROCKSDB_CHECKPOINT_META = "CHECKPOINT";
const char* ROCKSDB_CHECKPOINT_SELF_CHECK = "info";
static int64_t kBlockSize = 64 * 1024;
int32_t streamSnapHandleInit(SStreamSnapHandle* handle, char* path, void* pMeta);
@ -127,6 +130,7 @@ int32_t streamGetFileSize(char* path, char* name, int64_t* sz) {
int32_t ret = 0;
char* fullname = taosMemoryCalloc(1, strlen(path) + 32);
sprintf(fullname, "%s%s%s", path, TD_DIRSEP, name);
ret = taosStatFile(fullname, sz, NULL, NULL);
@ -148,7 +152,8 @@ int32_t streamDestroyTaskDbSnapInfo(void* arg, SArray* snap) { return taskDbDest
void snapFileDebugInfo(SBackendSnapFile2* pSnapFile) {
if (qDebugFlag & DEBUG_DEBUG) {
char* buf = taosMemoryCalloc(1, 512);
int16_t cap = 511;
char* buf = taosMemoryCalloc(1, cap + 1);
sprintf(buf + strlen(buf), "[");
if (pSnapFile->pCurrent) sprintf(buf, "current: %s,", pSnapFile->pCurrent);
@ -157,10 +162,10 @@ void snapFileDebugInfo(SBackendSnapFile2* pSnapFile) {
if (pSnapFile->pSst) {
for (int32_t i = 0; i < taosArrayGetSize(pSnapFile->pSst); i++) {
char* name = taosArrayGetP(pSnapFile->pSst, i);
sprintf(buf + strlen(buf), "%s,", name);
if (strlen(buf) + strlen(name) < cap) sprintf(buf + strlen(buf), "%s,", name);
}
}
sprintf(buf + strlen(buf) - 1, "]");
if ((strlen(buf)) < cap) sprintf(buf + strlen(buf) - 1, "]");
stInfo("%s %" PRId64 "-%" PRId64 " get file list: %s", STREAM_STATE_TRANSFER, pSnapFile->snapInfo.streamId,
pSnapFile->snapInfo.taskId, buf);
@ -199,6 +204,13 @@ int32_t snapFileGenMeta(SBackendSnapFile2* pSnapFile) {
// meta
item.name = pSnapFile->pCheckpointMeta;
item.type = ROCKSDB_CHECKPOINT_META_TYPE;
if (streamGetFileSize(pSnapFile->path, item.name, &item.size) == 0) {
taosArrayPush(pSnapFile->pFileList, &item);
}
item.name = pSnapFile->pCheckpointSelfcheck;
item.type = ROCKSDB_CHECKPOINT_SELFCHECK_TYPE;
if (streamGetFileSize(pSnapFile->path, item.name, &item.size) == 0) {
taosArrayPush(pSnapFile->pFileList, &item);
}
@ -231,6 +243,11 @@ int32_t snapFileReadMeta(SBackendSnapFile2* pSnapFile) {
pSnapFile->pCheckpointMeta = taosStrdup(name);
continue;
}
if (strlen(name) >= strlen(ROCKSDB_CHECKPOINT_SELF_CHECK) &&
0 == strncmp(name, ROCKSDB_CHECKPOINT_SELF_CHECK, strlen(ROCKSDB_CHECKPOINT_SELF_CHECK))) {
pSnapFile->pCheckpointSelfcheck = taosStrdup(name);
continue;
}
if (strlen(name) >= strlen(ROCKSDB_SST) &&
0 == strncmp(name + strlen(name) - strlen(ROCKSDB_SST), ROCKSDB_SST, strlen(ROCKSDB_SST))) {
char* sst = taosStrdup(name);
@ -276,6 +293,7 @@ void snapFileDestroy(SBackendSnapFile2* pSnap) {
taosMemoryFree(pSnap->pMainfest);
taosMemoryFree(pSnap->pOptions);
taosMemoryFree(pSnap->path);
taosMemoryFree(pSnap->pCheckpointSelfcheck);
for (int32_t i = 0; i < taosArrayGetSize(pSnap->pSst); i++) {
char* sst = taosArrayGetP(pSnap->pSst, i);
taosMemoryFree(sst);
@ -298,6 +316,7 @@ int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path, void* pMeta
SArray* pSnapInfoSet = taosArrayInit(4, sizeof(SStreamTaskSnap));
int32_t code = streamCreateTaskDbSnapInfo(pMeta, path, pSnapInfoSet);
if (code != 0) {
stError("failed to do task db snap info, reason:%s", tstrerror(terrno));
taosArrayDestroy(pSnapInfoSet);
return -1;
}