diff --git a/source/libs/stream/inc/streamBackendRocksdb.h b/source/libs/stream/inc/streamBackendRocksdb.h index c8d5797dad..9aa616e190 100644 --- a/source/libs/stream/inc/streamBackendRocksdb.h +++ b/source/libs/stream/inc/streamBackendRocksdb.h @@ -80,13 +80,14 @@ void* streamBackendInit(const char* path, int64_t chkpId); void streamBackendCleanup(void* arg); void streamBackendHandleCleanup(void* arg); int32_t streamBackendLoadCheckpointInfo(void* pMeta); -int32_t streamBackendDoCheckpoint(void* pMeta, uint64_t checkpointId); +int32_t streamBackendDoCheckpoint(void* pMeta, int64_t checkpointId); SListNode* streamBackendAddCompare(void* backend, void* arg); void streamBackendDelCompare(void* backend, void* arg); int32_t streamStateConvertDataFormat(char* path, char* key, void* cfInst); STaskBackendWrapper* taskBackendOpen(char* path, char* key); void taskBackendDestroy(void* pBackend); +int32_t taskBackendDoCheckpoint(void* arg, int64_t chkpId); void* taskBackendAddRef(void* pTaskBackend); void taskBackendRemoveRef(void* pTaskBackend); diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index 54aefc8962..f200c714ab 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -40,6 +40,11 @@ typedef struct { SRpcMsg msg; } SStreamContinueExecInfo; +typedef struct { + int64_t streamId; + int64_t taskId; + int64_t chkpId; +} SStreamTaskSnap; extern SStreamGlobalEnv streamEnv; extern int32_t streamBackendId; extern int32_t streamBackendCfWrapperId; diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 5e48a345b1..917bc0f796 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -953,7 +953,15 @@ int32_t chkpPreBuildDir(char* path, int64_t chkpId, char** chkpDir, char** chkpI int32_t taskBackendBuildSnap(void* arg, int64_t chkpId) { SStreamMeta* pMeta = arg; void* pIter = taosHashIterate(pMeta->pTaskBackendUnique, NULL); + int32_t code = 0; + while (pIter) { + STaskBackendWrapper* pBackend = *(STaskBackendWrapper**)pIter; + taskBackendAddRef(pBackend); + + code = taskBackendDoCheckpoint((STaskBackendWrapper*)pBackend, chkpId); + + taskBackendRemoveRef(pBackend); pIter = taosHashIterate(pMeta->pTaskBackendUnique, pIter); } return 0; @@ -1024,7 +1032,10 @@ int32_t streamBackendDelInUseChkp(void* arg, int64_t chkpId) { // taosWUnLockLatch(&pMeta->chkpDirLock); } -int32_t taskBackendDoCheckpoint(void* arg, uint64_t chkpId) { +/* + 0 +*/ +int32_t taskBackendDoCheckpoint(void* arg, int64_t chkpId) { STaskBackendWrapper* pBackend = arg; int64_t st = taosGetTimestampMs(); int32_t code = -1; @@ -1065,7 +1076,7 @@ _EXIT: taosReleaseRef(taskBackendWrapperId, refId); return code; } -int32_t streamBackendDoCheckpoint(void* arg, uint64_t chkpId) { return taskBackendDoCheckpoint(arg, chkpId); } +int32_t streamBackendDoCheckpoint(void* arg, int64_t chkpId) { return taskBackendDoCheckpoint(arg, chkpId); } SListNode* streamBackendAddCompare(void* backend, void* arg) { SBackendWrapper* pHandle = (SBackendWrapper*)backend; @@ -1577,8 +1588,8 @@ void* taskBackendAddRef(void* pTaskBackend) { return taosAcquireRef(taskBackendWrapperId, pBackend->refId); } void taskBackendRemoveRef(void* pTaskBackend) { - // STaskBackendWrapper* pBackend = pTaskBackend; - // taosReleaseRef(taskBackendWrapperId, pBackend->refId); + STaskBackendWrapper* pBackend = pTaskBackend; + taosReleaseRef(taskBackendWrapperId, pBackend->refId); } // void taskBackendDestroy(STaskBackendWrapper* wrapper); diff --git a/source/libs/stream/src/streamSnapshot.c b/source/libs/stream/src/streamSnapshot.c index 88790f2511..a4a507fc13 100644 --- a/source/libs/stream/src/streamSnapshot.c +++ b/source/libs/stream/src/streamSnapshot.c @@ -17,6 +17,7 @@ #include "query.h" #include "rocksdb/c.h" #include "streamBackendRocksdb.h" +#include "streamInt.h" #include "tcommon.h" enum SBackendFileType { @@ -39,7 +40,26 @@ typedef struct SBackendFile { SArray* pSst; char* pCheckpointMeta; char* path; + } SBanckendFile; + +typedef struct SBackendSnapFiles2 { + char* pCurrent; + char* pMainfest; + char* pOptions; + SArray* pSst; + char* pCheckpointMeta; + char* path; + + int64_t checkpointId; + int64_t seraial; + int64_t offset; + TdFilePtr fd; + int8_t filetype; + SArray* pFileList; + int32_t currFileIdx; + +} SBackendSnapFile2; struct SStreamSnapHandle { void* handle; SBanckendFile* pBackendFile; @@ -50,6 +70,9 @@ struct SStreamSnapHandle { int8_t filetype; SArray* pFileList; int32_t currFileIdx; + + SArray* pBackendSnapSet; + int32_t currIdx; }; struct SStreamSnapBlockHdr { int8_t type; @@ -108,202 +131,204 @@ TdFilePtr streamOpenFile(char* path, char* name, int32_t opt) { return taosOpenFile(fullname, opt); } -int32_t streamStateSnapBuild(void* arg, char* path, int64_t chkpId) { - return taskBackendBuildSnap(arg, chkpId); - // int32_t code = 0; - // int8_t validChkp = 0; +int32_t streamBackendGetSnapInfo(void* arg, char* path, int64_t chkpId) { return taskBackendBuildSnap(arg, chkpId); } - // int len = strlen(path); - // char* tpath = taosMemoryCalloc(1, len + 256); - // memcpy(tpath, path, len); +void snapFileDebugInfo(SBackendSnapFile2* pSnapFile) { + char* buf = taosMemoryCalloc(1, 512); + sprintf(buf, "[current: %s,", pSnapFile->pCurrent); + sprintf(buf + strlen(buf), "MANIFEST: %s,", pSnapFile->pMainfest); + sprintf(buf + strlen(buf), "options: %s,", pSnapFile->pOptions); - // SStreamMeta *pMeta = arg; - // if (chkpId != 0) { - // sprintf(tpath, "%s%s%s%s%s%scheckpoint%" PRId64 "", path, TD_DIRSEP, "stream", TD_DIRSEP, "checkpoints", - // TD_DIRSEP, - // chkpId); - // if (taosIsDir(tpath)) { - // validChkp = 1; - // qInfo("%s start to read snap %s", STREAM_STATE_TRANSFER, tpath); - // streamBackendAddInUseChkp(pMeta, chkpId); - // } else { - // qWarn("%s failed to read from %s, reason: dir not exist,retry to default state dir", STREAM_STATE_TRANSFER, - // tpath); - // } - // } + for (int i = 0; i < taosArrayGetSize(pSnapFile->pSst); i++) { + char* name = taosArrayGetP(pSnapFile->pSst, i); + sprintf(buf + strlen(buf), "%s,", name); + } + sprintf(buf + strlen(buf) - 1, "]"); - // no checkpoint specified or not exists invalid checkpoint, do checkpoint at default path and translate it - // if (validChkp == 0) { - // sprintf(tpath, "%s%s%s%s%s", path, TD_DIRSEP, "stream", TD_DIRSEP, "state"); - // char* chkpdir = taosMemoryCalloc(1, len + 256); - // sprintf(chkpdir, "%s%s%s", tpath, TD_DIRSEP, "tmp"); - // taosMemoryFree(tpath); - - // tpath = chkpdir; - // qInfo("%s start to trigger checkpoint on %s", STREAM_STATE_TRANSFER, tpath); - - // code = streamBackendTriggerChkp(arg, tpath); - // if (code != 0) { - // qError("%s failed to trigger chekckpoint at %s", STREAM_STATE_TRANSFER, tpath); - // taosMemoryFree(tpath); - // return code; - // } - // chkpId = 0; - // } - - //*dstPath = tpath; + qInfo("%s get file list: %s", STREAM_STATE_TRANSFER, buf); + taosMemoryFree(buf); } -int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path, int64_t chkpId, void* pMeta) { - // impl later - char* tdir = NULL; - int32_t code = streamStateSnapBuild(pMeta, path, chkpId); - if (code != 0) { + +int32_t snapFileCvtMeta(SBackendSnapFile2* pSnapFile) { + SBackendFileItem item; + // current + item.name = pSnapFile->pCurrent; + item.type = ROCKSDB_CURRENT_TYPE; + streamGetFileSize(pSnapFile->path, item.name, &item.size); + taosArrayPush(pSnapFile->pFileList, &item); + + // mainfest + item.name = pSnapFile->pMainfest; + item.type = ROCKSDB_MAINFEST_TYPE; + streamGetFileSize(pSnapFile->path, item.name, &item.size); + taosArrayPush(pSnapFile->pFileList, &item); + + // options + item.name = pSnapFile->pOptions; + item.type = ROCKSDB_OPTIONS_TYPE; + streamGetFileSize(pSnapFile->path, item.name, &item.size); + taosArrayPush(pSnapFile->pFileList, &item); + // sst + for (int i = 0; i < taosArrayGetSize(pSnapFile->pSst); i++) { + char* sst = taosArrayGetP(pSnapFile->pSst, i); + item.name = sst; + item.type = ROCKSDB_SST_TYPE; + streamGetFileSize(pSnapFile->path, item.name, &item.size); + taosArrayPush(pSnapFile->pFileList, &item); + } + // meta + item.name = pSnapFile->pCheckpointMeta; + item.type = ROCKSDB_CHECKPOINT_META_TYPE; + if (streamGetFileSize(pSnapFile->path, item.name, &item.size) == 0) { + taosArrayPush(pSnapFile->pFileList, &item); + } + return 0; +} +int32_t snapFileReadMeta(SBackendSnapFile2* pSnapFile) { + TdDirPtr pDir = taosOpenDir(pSnapFile->path); + if (NULL == pDir) { + qError("%s failed to open %s", STREAM_STATE_TRANSFER, pSnapFile->path); return -1; } - qInfo("%s start to read dir: %s", STREAM_STATE_TRANSFER, tdir); - - TdDirPtr pDir = taosOpenDir(tdir); - if (NULL == pDir) { - qError("%s failed to open %s", STREAM_STATE_TRANSFER, tdir); - goto _err; - } - - SBanckendFile* pFile = taosMemoryCalloc(1, sizeof(SBanckendFile)); - pHandle->pBackendFile = pFile; - pHandle->checkpointId = chkpId; - pHandle->seraial = 0; - - pFile->path = tdir; - pFile->pSst = taosArrayInit(16, sizeof(void*)); - TdDirEntryPtr pDirEntry; while ((pDirEntry = taosReadDir(pDir)) != NULL) { char* name = taosGetDirEntryName(pDirEntry); if (strlen(name) >= strlen(ROCKSDB_CURRENT) && 0 == strncmp(name, ROCKSDB_CURRENT, strlen(ROCKSDB_CURRENT))) { - pFile->pCurrent = taosStrdup(name); + pSnapFile->pCurrent = taosStrdup(name); continue; } if (strlen(name) >= strlen(ROCKSDB_MAINFEST) && 0 == strncmp(name, ROCKSDB_MAINFEST, strlen(ROCKSDB_MAINFEST))) { - pFile->pMainfest = taosStrdup(name); + pSnapFile->pMainfest = taosStrdup(name); continue; } if (strlen(name) >= strlen(ROCKSDB_OPTIONS) && 0 == strncmp(name, ROCKSDB_OPTIONS, strlen(ROCKSDB_OPTIONS))) { - pFile->pOptions = taosStrdup(name); + pSnapFile->pOptions = taosStrdup(name); continue; } if (strlen(name) >= strlen(ROCKSDB_CHECKPOINT_META) && 0 == strncmp(name, ROCKSDB_CHECKPOINT_META, strlen(ROCKSDB_CHECKPOINT_META))) { - pFile->pCheckpointMeta = taosStrdup(name); + pSnapFile->pCheckpointMeta = taosStrdup(name); continue; } if (strlen(name) >= strlen(ROCKSDB_SST) && 0 == strncmp(name + strlen(name) - strlen(ROCKSDB_SST), ROCKSDB_SST, strlen(ROCKSDB_SST))) { char* sst = taosStrdup(name); - taosArrayPush(pFile->pSst, &sst); + taosArrayPush(pSnapFile->pSst, &sst); } } - { - char* buf = taosMemoryCalloc(1, 512); - sprintf(buf, "[current: %s,", pFile->pCurrent); - sprintf(buf + strlen(buf), "MANIFEST: %s,", pFile->pMainfest); - sprintf(buf + strlen(buf), "options: %s,", pFile->pOptions); - - for (int i = 0; i < taosArrayGetSize(pFile->pSst); i++) { - char* name = taosArrayGetP(pFile->pSst, i); - sprintf(buf + strlen(buf), "%s,", name); - } - sprintf(buf + strlen(buf) - 1, "]"); - - qInfo("%s get file list: %s", STREAM_STATE_TRANSFER, buf); - taosMemoryFree(buf); - } - taosCloseDir(&pDir); - - if (pFile->pCurrent == NULL) { - qError("%s failed to open %s, reason: no valid file", STREAM_STATE_TRANSFER, tdir); - code = -1; - tdir = NULL; - goto _err; - } - SArray* list = taosArrayInit(64, sizeof(SBackendFileItem)); - - SBackendFileItem item; - // current - item.name = pFile->pCurrent; - item.type = ROCKSDB_CURRENT_TYPE; - streamGetFileSize(pFile->path, item.name, &item.size); - taosArrayPush(list, &item); - - // mainfest - item.name = pFile->pMainfest; - item.type = ROCKSDB_MAINFEST_TYPE; - streamGetFileSize(pFile->path, item.name, &item.size); - taosArrayPush(list, &item); - - // options - item.name = pFile->pOptions; - item.type = ROCKSDB_OPTIONS_TYPE; - streamGetFileSize(pFile->path, item.name, &item.size); - taosArrayPush(list, &item); - // sst - for (int i = 0; i < taosArrayGetSize(pFile->pSst); i++) { - char* sst = taosArrayGetP(pFile->pSst, i); - item.name = sst; - item.type = ROCKSDB_SST_TYPE; - streamGetFileSize(pFile->path, item.name, &item.size); - taosArrayPush(list, &item); - } - // meta - item.name = pFile->pCheckpointMeta; - item.type = ROCKSDB_CHECKPOINT_META_TYPE; - if (streamGetFileSize(pFile->path, item.name, &item.size) == 0) { - taosArrayPush(list, &item); - } - - pHandle->pBackendFile = pFile; - - pHandle->currFileIdx = 0; - pHandle->pFileList = list; - pHandle->seraial = 0; - pHandle->offset = 0; - pHandle->handle = pMeta; return 0; +} +int32_t streamBackendSnapInitFile(char* path, SStreamTaskSnap* pSnap, SBackendSnapFile2* pSnapFile) { + // SBanckendFile* pFile = taosMemoryCalloc(1, sizeof(SBanckendFile)); + int32_t code = -1; + + char* snapPath = taosMemoryCalloc(1, strlen(path) + 256); + sprintf(snapPath, "%s%s%" PRId64 "_%" PRId64 "%s%s%s%s%scheckpoint%" PRId64 "", path, TD_DIRSEP, pSnap->streamId, + pSnap->taskId, TD_DIRSEP, "state", TD_DIRSEP, "checkpoints", TD_DIRSEP, pSnap->chkpId); + if (taosIsDir(snapPath)) { + goto _ERROR; + } + + pSnapFile->pSst = taosArrayInit(16, sizeof(void*)); + pSnapFile->pFileList = taosArrayInit(64, sizeof(SBackendFileItem)); + pSnapFile->path = snapPath; + if ((code = snapFileReadMeta(pSnapFile)) != 0) { + goto _ERROR; + } + if ((code = snapFileCvtMeta(pSnapFile)) != 0) { + goto _ERROR; + } + + snapFileDebugInfo(pSnapFile); + + code = 0; + +_ERROR: + taosMemoryFree(snapPath); + return code; +} +void snapFileDestroy(SBackendSnapFile2* pSnap) { + taosMemoryFree(pSnap->pCheckpointMeta); + taosMemoryFree(pSnap->pCurrent); + taosMemoryFree(pSnap->pMainfest); + taosMemoryFree(pSnap->pOptions); + taosMemoryFree(pSnap->path); + for (int i = 0; i < taosArrayGetSize(pSnap->pSst); i++) { + char* sst = taosArrayGetP(pSnap->pSst, i); + taosMemoryFree(sst); + } + taosArrayDestroy(pSnap->pFileList); + taosArrayDestroy(pSnap->pSst); + taosCloseFile(&pSnap->fd); + + return; +} +int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path, int64_t chkpId, void* pMeta) { + // impl later + + SArray* pSnapSet = NULL; + int32_t code = streamBackendGetSnapInfo(pMeta, path, chkpId); + if (code != 0) { + return -1; + } + + SArray* pBdSnapSet = taosArrayInit(8, sizeof(SBackendSnapFile2)); + + for (int i = 0; i < taosArrayGetSize(pSnapSet); i++) { + SStreamTaskSnap* pSnap = taosArrayGet(pSnapSet, i); + + SBackendSnapFile2 snapFile = {0}; + code = streamBackendSnapInitFile(path, pSnap, &snapFile); + ASSERT(code == 0); + taosArrayPush(pBdSnapSet, &snapFile); + } + + pHandle->pBackendSnapSet = pBdSnapSet; + pHandle->currIdx = 0; + return 0; + _err: streamSnapHandleDestroy(pHandle); - taosMemoryFreeClear(tdir); code = -1; return code; } void streamSnapHandleDestroy(SStreamSnapHandle* handle) { - SBanckendFile* pFile = handle->pBackendFile; + // SBanckendFile* pFile = handle->pBackendFile; + if (handle->pBackendSnapSet) { + for (int i = 0; i < taosArrayGetSize(handle->pBackendSnapSet); i++) { + SBackendSnapFile2* pSnapFile = taosArrayGet(handle->pBackendSnapSet, i); + snapFileDestroy(pSnapFile); + } + taosArrayDestroy(handle->pBackendSnapSet); + } - if (handle->checkpointId == 0) { - // del tmp dir - if (pFile && taosIsDir(pFile->path)) { - taosRemoveDir(pFile->path); - } - } else { - streamBackendDelInUseChkp(handle->handle, handle->checkpointId); - } - if (pFile) { - taosMemoryFree(pFile->pCheckpointMeta); - taosMemoryFree(pFile->pCurrent); - taosMemoryFree(pFile->pMainfest); - taosMemoryFree(pFile->pOptions); - taosMemoryFree(pFile->path); - for (int i = 0; i < taosArrayGetSize(pFile->pSst); i++) { - char* sst = taosArrayGetP(pFile->pSst, i); - taosMemoryFree(sst); - } - taosArrayDestroy(pFile->pSst); - taosMemoryFree(pFile); - } - taosArrayDestroy(handle->pFileList); - taosCloseFile(&handle->fd); + // if (handle->checkpointId == 0) { + // // del tmp dir + // if (pFile && taosIsDir(pFile->path)) { + // taosRemoveDir(pFile->path); + // } + // } else { + // streamBackendDelInUseChkp(handle->handle, handle->checkpointId); + // } + // if (pFile) { + // taosMemoryFree(pFile->pCheckpointMeta); + // taosMemoryFree(pFile->pCurrent); + // taosMemoryFree(pFile->pMainfest); + // taosMemoryFree(pFile->pOptions); + // taosMemoryFree(pFile->path); + // for (int i = 0; i < taosArrayGetSize(pFile->pSst); i++) { + // char* sst = taosArrayGetP(pFile->pSst, i); + // taosMemoryFree(sst); + // } + // taosArrayDestroy(pFile->pSst); + // taosMemoryFree(pFile); + // } + // taosArrayDestroy(handle->pFileList); + // taosCloseFile(&handle->fd); return; }