diff --git a/out b/out new file mode 100755 index 0000000000..21f5cbee37 Binary files /dev/null and b/out differ diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index a710f2531a..7ff651d190 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -19,6 +19,8 @@ #include "tcommon.h" #include "tref.h" +#define META_ON_S3_FORMATE "%s_%" PRId64 "\n%s_%" PRId64 "\n%s_%" PRId64 "" + typedef struct SCompactFilteFactory { void* status; } SCompactFilteFactory; @@ -233,15 +235,28 @@ int32_t rebuildDirFromCheckpoint(const char* path, int64_t chkpId, char** dst) { return 0; } -int32_t remoteChkp_readMetaData(char* path, SArray* list) { - int32_t cap = strlen(path); - char* metaPath = taosMemoryCalloc(1, cap + 32); +typedef struct { + char pCurrName[24]; + int64_t currChkptId; + + char pManifestName[24]; + int64_t manifestChkptId; + + char processName[24]; + int64_t processId; +} SSChkpMetaOnS3; + +int32_t remoteChkp_readMetaData(char* path, SSChkpMetaOnS3** pMeta) { + int32_t cap = strlen(path) + 32; + + char* metaPath = taosMemoryCalloc(1, cap); if (metaPath == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } - if (sprintf(metaPath, "%s%s%s", path, TD_DIRSEP, "META") >= (cap + 32)) { + int32_t n = sprintf(metaPath, "%s%s%s", path, TD_DIRSEP, "META"); + if (n <= 0 || n >= (cap - 1)) { terrno = TSDB_CODE_OUT_OF_MEMORY; taosMemoryFree(metaPath); return -1; @@ -254,23 +269,23 @@ int32_t remoteChkp_readMetaData(char* path, SArray* list) { return -1; } - char buf[128] = {0}; + char buf[256] = {0}; if (taosReadFile(pFile, buf, sizeof(buf)) <= 0) { + terrno = TAOS_SYSTEM_ERROR(errno); taosMemoryFree(metaPath); taosCloseFile(&pFile); return -1; } - int32_t len = strlen(buf); - for (int i = 0; i < len; i++) { - if (buf[i] == '\n') { - char* item = taosMemoryCalloc(1, i + 1); - memcpy(item, buf, i); - taosArrayPush(list, &item); - item = taosMemoryCalloc(1, len - i); - memcpy(item, buf + i + 1, len - i - 1); - taosArrayPush(list, &item); - } + SSChkpMetaOnS3* p = taosMemoryCalloc(1, sizeof(SSChkpMetaOnS3)); + n = sscanf(buf, META_ON_S3_FORMATE, p->pCurrName, &p->currChkptId, p->pManifestName, &p->manifestChkptId, + p->processName, &p->processId); + if (n != 6) { + terrno = TSDB_CODE_INVALID_MSG; + taosMemoryFree(p); + taosMemoryFree(metaPath); + taosCloseFile(&pFile); + return -1; } taosCloseFile(&pFile); @@ -291,7 +306,7 @@ int32_t remoteChkp_validMetaFile(char* name, char* prename, int64_t chkpId) { } return valid; } -int32_t remoteChkp_validAndCvtMeta(char* path, SArray* list, int64_t chkpId) { +int32_t remoteChkp_validAndCvtMeta(char* path, SSChkpMetaOnS3* pMeta, int64_t chkpId) { int32_t complete = 1; int32_t len = strlen(path) + 32; char* src = taosMemoryCalloc(1, len); @@ -301,33 +316,38 @@ int32_t remoteChkp_validAndCvtMeta(char* path, SArray* list, int64_t chkpId) { return -1; } + if (pMeta->currChkptId != chkpId || pMeta->manifestChkptId != chkpId) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + int8_t count = 0; - for (int i = 0; i < taosArrayGetSize(list); i++) { - char* p = taosArrayGetP(list, i); - sprintf(src, "%s%s%s", path, TD_DIRSEP, p); + // for (int i = 0; i < taosArrayGetSize(list); i++) { + // char* p = taosArrayGetP(list, i); + // sprintf(src, "%s%s%s", path, TD_DIRSEP, p); - // check file exist - if (taosStatFile(src, NULL, NULL, NULL) != 0) { - complete = 0; - break; - } + // // check file exist + // if (taosStatFile(src, NULL, NULL, NULL) != 0) { + // complete = 0; + // break; + // } - // check file name - char temp[64] = {0}; - if (remoteChkp_validMetaFile(p, temp, chkpId)) { - count++; - } + // // check file name + // char temp[64] = {0}; + // if (remoteChkp_validMetaFile(p, temp, chkpId)) { + // count++; + // } - // rename file - sprintf(dst, "%s%s%s", path, TD_DIRSEP, temp); - taosRenameFile(src, dst); + // // rename file + // sprintf(dst, "%s%s%s", path, TD_DIRSEP, temp); + // taosRenameFile(src, dst); - memset(src, 0, len); - memset(dst, 0, len); - } - if (count != taosArrayGetSize(list)) { - complete = 0; - } + // memset(src, 0, len); + // memset(dst, 0, len); + // } + // if (count != taosArrayGetSize(list)) { + // complete = 0; + // } taosMemoryFree(src); taosMemoryFree(dst); @@ -385,12 +405,14 @@ int32_t rebuildFromRemoteChkp_s3(const char* key, char* chkpPath, int64_t chkpId if (taosIsDir(tmp)) taosRemoveDir(tmp); if (taosIsDir(defaultPath)) taosRenameFile(defaultPath, tmp); - SArray* list = taosArrayInit(2, sizeof(void*)); - code = remoteChkp_readMetaData(chkpPath, list); + // SArray* list = taosArrayInit(2, sizeof(void*)); + SSChkpMetaOnS3* pMeta; + code = remoteChkp_readMetaData(chkpPath, &pMeta); if (code == 0) { - code = remoteChkp_validAndCvtMeta(chkpPath, list, chkpId); + code = remoteChkp_validAndCvtMeta(chkpPath, pMeta, chkpId); } - taosArrayDestroyP(list, taosMemoryFree); + taosMemoryFree(pMeta); + // taosArrayDestroyP(list, taosMemoryFree); if (code == 0) { taosMkDir(defaultPath); @@ -1322,6 +1344,9 @@ int32_t chkpLoadExtraInfo(char* pChkpIdDir, int64_t* chkpId, int64_t* processId) TdFilePtr pFile = NULL; int32_t code = -1; + char buf[256] = {0}; + int32_t nBytes = 0; + int32_t len = strlen(pChkpIdDir); if (len == 0) { terrno = TSDB_CODE_INVALID_PARA; @@ -1336,7 +1361,8 @@ int32_t chkpLoadExtraInfo(char* pChkpIdDir, int64_t* chkpId, int64_t* processId) goto _EXIT; } - if (sprintf(pDst, "%s%sinfo", pChkpIdDir, TD_DIRSEP) <= 0) { + nBytes = snprintf(pDst, len + 64, "%s%sinfo", pChkpIdDir, TD_DIRSEP); + if (nBytes != strlen(pDst)) { code = -1; stError("failed to build dst to load extra info, dir:%s", pChkpIdDir); goto _EXIT; @@ -1349,7 +1375,6 @@ int32_t chkpLoadExtraInfo(char* pChkpIdDir, int64_t* chkpId, int64_t* processId) goto _EXIT; } - char buf[256] = {0}; if (taosReadFile(pFile, buf, sizeof(buf)) <= 0) { terrno = TAOS_SYSTEM_ERROR(errno); stError("failed to read file to load extra info, file:%s, reason:%s", pDst, tstrerror(terrno)); @@ -1368,8 +1393,12 @@ _EXIT: return code; } int32_t chkpAddExtraInfo(char* pChkpIdDir, int64_t chkpId, int64_t processId) { + int32_t code = -1; + TdFilePtr pFile = NULL; - int32_t code = -1; + + char buf[256] = {0}; + int32_t nBytes = 0; int32_t len = strlen(pChkpIdDir); if (len == 0) { @@ -1385,7 +1414,8 @@ int32_t chkpAddExtraInfo(char* pChkpIdDir, int64_t chkpId, int64_t processId) { goto _EXIT; } - if (sprintf(pDst, "%s%sinfo", pChkpIdDir, TD_DIRSEP) < 0) { + nBytes = snprintf(pDst, len + 64, "%s%sinfo", pChkpIdDir, TD_DIRSEP); + if (nBytes != strlen(pDst)) { stError("failed to build dst to add extra info, dir:%s", pChkpIdDir); goto _EXIT; } @@ -1397,15 +1427,14 @@ int32_t chkpAddExtraInfo(char* pChkpIdDir, int64_t chkpId, int64_t processId) { goto _EXIT; } - char buf[256] = {0}; - int n = snprintf(buf, sizeof(buf), "%" PRId64 " %" PRId64 "", chkpId, processId); - if (n <= 0 || n >= sizeof(buf)) { + nBytes = snprintf(buf, sizeof(buf), "%" PRId64 " %" PRId64 "", chkpId, processId); + if (nBytes != strlen(buf)) { code = -1; stError("failed to build content to add extra info, dir:%s", pChkpIdDir); goto _EXIT; } - if (taosWriteFile(pFile, buf, strlen(buf)) <= 0) { + if (nBytes != taosWriteFile(pFile, buf, nBytes)) { terrno = TAOS_SYSTEM_ERROR(errno); stError("failed to write file to add extra info, file:%s, reason:%s", pDst, tstrerror(terrno)); goto _EXIT; @@ -2430,18 +2459,27 @@ void taskDbDestroy2(void* pDb) { taskDbDestroy(pDb, true); } int32_t taskDbGenChkpUploadData__rsync(STaskDbWrapper* pDb, int64_t chkpId, char** path) { int32_t code = -1; int64_t refId = pDb->refId; + int32_t nBytes = 0; if (taosAcquireRef(taskDbWrapperId, refId) == NULL) { return -1; } - char* buf = taosMemoryCalloc(1, strlen(pDb->path) + 128); + int32_t cap = strlen(pDb->path) + 128; + + char* buf = taosMemoryCalloc(1, cap); if (buf == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } - sprintf(buf, "%s%s%s%s%s%" PRId64 "", pDb->path, TD_DIRSEP, "checkpoints", TD_DIRSEP, "checkpoint", chkpId); + nBytes = + snprintf(buf, cap, "%s%s%s%s%s%" PRId64 "", pDb->path, TD_DIRSEP, "checkpoints", TD_DIRSEP, "checkpoint", chkpId); + if (nBytes != strlen(buf)) { + terrno = TSDB_CODE_OUT_OF_RANGE; + return -1; + } + if (taosIsDir(buf)) { code = 0; *path = buf; @@ -4473,8 +4511,18 @@ int32_t dbChkpDumpTo(SDbChkp* p, char* dname, SArray* list) { goto _ERROR; } - sprintf(srcDir, "%s%s%s%s%s%" PRId64 "", p->path, TD_DIRSEP, "checkpoints", TD_DIRSEP, "checkpoint", p->curChkpId); - sprintf(dstDir, "%s", dname); + int nBytes = snprintf(srcDir, len, "%s%s%s%s%s%" PRId64 "", p->path, TD_DIRSEP, "checkpoints", TD_DIRSEP, + "checkpoint", p->curChkpId); + if (nBytes != strlen(srcBuf)) { + terrno = TSDB_CODE_OUT_OF_RANGE; + goto _ERROR; + } + + nBytes = snprintf(dstDir, len, "%s", dname); + if (nBytes != strlen(dstBuf)) { + terrno = TSDB_CODE_OUT_OF_RANGE; + goto _ERROR; + } if (!taosDirExist(srcDir)) { stError("failed to dump srcDir %s, reason: not exist such dir", srcDir); @@ -4540,14 +4588,20 @@ int32_t dbChkpDumpTo(SDbChkp* p, char* dname, SArray* list) { stError("chkp failed to create meta file: %s, reason:%s", dstDir, tstrerror(terrno)); goto _ERROR; } - // META_ON_S3 - // current_checkpointID - // manifest_checkpointID - // processVer_processID - char content[128] = {0}; - snprintf(content, sizeof(content), "%s_%" PRId64 "\n%s_%" PRId64 "\n%s_%" PRId64 "", p->pCurrent, p->curChkpId, - p->pManifest, p->curChkpId, "processVer", processId); - if (taosWriteFile(pFile, content, strlen(content)) <= 0) { + + char content[256] = {0}; + nBytes = snprintf(content, sizeof(content), META_ON_S3_FORMATE, p->pCurrent, p->curChkpId, p->pManifest, p->curChkpId, + "processVer", processId); + if (nBytes != strlen(content)) { + terrno = TSDB_CODE_INVALID_MSG; + stError("chkp failed to format meta file: %s, reason: invalid msg", dstDir); + taosCloseFile(&pFile); + code = -1; + goto _ERROR; + } + + nBytes = taosWriteFile(pFile, content, strlen(content)); + if (nBytes != strlen(content)) { terrno = errno; stError("chkp failed to write meta file: %s,reason:%s", dstDir, tstrerror(terrno)); taosCloseFile(&pFile); @@ -4612,17 +4666,28 @@ int32_t bkdMgtGetDelta(SBkdMgt* bm, char* taskId, int64_t chkpId, SArray* list, sprintf(path, "%s%s%s", bm->path, TD_DIRSEP, taskId); SDbChkp* p = dbChkpCreate(path, chkpId); - taosHashPut(bm->pDbChkpTbl, taskId, strlen(taskId), &p, sizeof(void*)); + if (p == NULL) { + taosMemoryFree(path); + taosThreadRwlockUnlock(&bm->rwLock); + return -1; + } + + if (taosHashPut(bm->pDbChkpTbl, taskId, strlen(taskId), &p, sizeof(void*)) != 0) { + dbChkpDestroy(p); + taosMemoryFree(path); + taosThreadRwlockUnlock(&bm->rwLock); + return -1; + } pChkp = p; - code = dbChkpDumpTo(pChkp, dname, list); taosThreadRwlockUnlock(&bm->rwLock); return code; - } + } else { + code = dbChkpGetDelta(pChkp, chkpId, NULL); - code = dbChkpGetDelta(pChkp, chkpId, NULL); - code = dbChkpDumpTo(pChkp, dname, list); + if (code == 0) code = dbChkpDumpTo(pChkp, dname, list); + } taosThreadRwlockUnlock(&bm->rwLock); return code; diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 26df7b1627..bc3762a6d5 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -527,27 +527,41 @@ void streamTaskSetFailedCheckpointId(SStreamTask* pTask) { } static int32_t getCheckpointDataMeta(const char* id, const char* path, SArray* list) { - char buf[128] = {0}; + TdFilePtr pFile = NULL; + int32_t cap = strlen(path) + 32; + char buf[128] = {0}; + int32_t code = 0; - char* file = taosMemoryCalloc(1, strlen(path) + 32); - sprintf(file, "%s%s%s", path, TD_DIRSEP, "META_TMP"); + char* filePath = taosMemoryCalloc(1, cap); + if (filePath == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } - int32_t code = downloadCheckpointDataByName(id, "META", file); + int32_t nBytes = snprintf(filePath, cap, "%s%s%s", path, TD_DIRSEP, "META_TMP"); + if (nBytes != strlen(filePath)) { + taosMemoryFree(filePath); + terrno = TSDB_CODE_OUT_OF_RANGE; + return -1; + } + + code = downloadCheckpointDataByName(id, "META", filePath); if (code != 0) { - stDebug("%s chkp failed to download meta file:%s", id, file); - taosMemoryFree(file); + stDebug("%s chkp failed to download meta file:%s", id, filePath); + taosMemoryFree(filePath); return code; } - TdFilePtr pFile = taosOpenFile(file, TD_FILE_READ); + pFile = taosOpenFile(filePath, TD_FILE_READ); if (pFile == NULL) { - stError("%s failed to open meta file:%s for checkpoint", id, file); - code = -1; - return code; + terrno = TAOS_SYSTEM_ERROR(errno); + stError("%s failed to open meta file:%s for checkpoint", id, filePath); + taosMemoryFree(filePath); + return -1; } if (taosReadFile(pFile, buf, sizeof(buf)) <= 0) { - stError("%s failed to read meta file:%s for checkpoint", id, file); + stError("%s failed to read meta file:%s for checkpoint", id, filePath); code = -1; } else { int32_t len = strnlen(buf, tListLen(buf)); @@ -565,27 +579,33 @@ static int32_t getCheckpointDataMeta(const char* id, const char* path, SArray* l } taosCloseFile(&pFile); - taosRemoveFile(file); - taosMemoryFree(file); + taosRemoveFile(filePath); + taosMemoryFree(filePath); return code; } int32_t uploadCheckpointData(SStreamTask* pTask, int64_t checkpointId, int64_t dbRefId, ECHECKPOINT_BACKUP_TYPE type) { char* path = NULL; int32_t code = 0; - SArray* toDelFiles = taosArrayInit(4, POINTER_BYTES); - int64_t now = taosGetTimestampMs(); SStreamMeta* pMeta = pTask->pMeta; const char* idStr = pTask->id.idStr; + int64_t now = taosGetTimestampMs(); + + SArray* toDelFiles = taosArrayInit(4, POINTER_BYTES); + if (toDelFiles == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } if ((code = taskDbGenChkpUploadData(pTask->pBackend, pMeta->bkdChkptMgt, checkpointId, type, &path, toDelFiles, pTask->id.idStr)) != 0) { - stError("s-task:%s failed to gen upload checkpoint:%" PRId64, idStr, checkpointId); + stError("s-task:%s failed to gen upload checkpoint:%" PRId64 ", reason:%s", idStr, checkpointId, tstrerror(terrno)); } if (type == DATA_UPLOAD_S3) { if (code == TSDB_CODE_SUCCESS && (code = getCheckpointDataMeta(idStr, path, toDelFiles)) != 0) { - stError("s-task:%s failed to get checkpointData for checkpointId:%" PRId64 " meta", idStr, checkpointId); + stError("s-task:%s failed to get checkpointData for checkpointId:%" PRId64 ", reason:%s", idStr, checkpointId, + tstrerror(terrno)); } } @@ -594,7 +614,7 @@ int32_t uploadCheckpointData(SStreamTask* pTask, int64_t checkpointId, int64_t d if (code == TSDB_CODE_SUCCESS) { stDebug("s-task:%s upload checkpointId:%" PRId64 " to remote succ", idStr, checkpointId); } else { - stError("s-task:%s failed to upload checkpointId:%" PRId64 " data:%s", idStr, checkpointId, path); + stError("s-task:%s failed to upload checkpointId:%" PRId64 " path:%s,reason:%s", idStr, checkpointId, path); } } diff --git a/t.c b/t.c new file mode 100644 index 0000000000..a79ed4c134 --- /dev/null +++ b/t.c @@ -0,0 +1,12 @@ +#include +#include +#include + +int main() { + char *buf = calloc(1, 4); + int n = snprintf(buf, 4, "size"); + + printf("write size:%d \t buf:%s \t len:%d\n", n, buf, (int)(strlen(buf))); + buf[4] = 10; + return 1; +}