diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 260609e697..1d4e87e2bd 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -410,7 +410,7 @@ typedef struct SStreamMeta { int32_t walScanCounter; void* streamBackend; int64_t streamBackendRid; - SHashObj* pTaskBackendUnique; + SHashObj* pTaskDbUnique; TdThreadMutex backendMutex; SMetaHbInfo hbInfo; int32_t closedTask; @@ -727,7 +727,7 @@ int32_t streamMetaCommit(SStreamMeta* pMeta); int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta); int32_t streamMetaReloadAllTasks(SStreamMeta* pMeta); void streamMetaNotifyClose(SStreamMeta* pMeta); -void* streamMetaGetBackendByTaskKey(SStreamMeta* pMeta, char* key, int64_t* ref); +void* streamMetaGetBackendByTaskKey(SStreamMeta* pMeta, char* key, int64_t chkpId, int64_t* ref); // checkpoint int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSourceReq* pReq); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 3546df7e91..fbd8a3ee05 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -738,7 +738,10 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { int32_t code = streamTaskInit(pTask, pTq->pStreamMeta, &pTq->pVnode->msgCb, ver); if (code != TSDB_CODE_SUCCESS) return code; - pTask->pBackend = streamMetaGetBackendByTaskKey(pTq->pStreamMeta, (char*)pTask->id.idStr, &pTask->backendRefId); + pTask->pBackend = streamMetaGetBackendByTaskKey(pTq->pStreamMeta, (char*)pTask->id.idStr, pTask->checkpointingId, + &pTask->backendRefId); + + // taskDbUpdateChkpId(pTask->pBackend, pTask->checkpointingId); if (pTask->pBackend == NULL) return -1; streamTaskOpenAllUpstreamInput(pTask); diff --git a/source/libs/stream/inc/streamBackendRocksdb.h b/source/libs/stream/inc/streamBackendRocksdb.h index 0f97bda4c5..2403bca7f6 100644 --- a/source/libs/stream/inc/streamBackendRocksdb.h +++ b/source/libs/stream/inc/streamBackendRocksdb.h @@ -83,12 +83,14 @@ int32_t streamBackendLoadCheckpointInfo(void* pMeta); int32_t streamBackendDoCheckpoint(void* pMeta, int64_t checkpointId); SListNode* streamBackendAddCompare(void* backend, void* arg); void streamBackendDelCompare(void* backend, void* arg); -int32_t streamStateConvertDataFormat(char* path, char* key, void* cfInst); +int32_t streamStateCvtDataFormat(char* path, char* key, void* cfInst); -STaskDbWrapper* taskDbOpen(char* path, char* key); +STaskDbWrapper* taskDbOpen(char* path, char* key, int64_t chkpId); void taskDbDestroy(void* pBackend); int32_t taskDbDoCheckpoint(void* arg, int64_t chkpId); +void taskDbUpdateChkpId(void* pTaskDb, int64_t chkpId); + void* taskDbAddRef(void* pTaskDb); void taskDbRemoveRef(void* pTaskDb); @@ -182,7 +184,7 @@ int32_t streamBackendTriggerChkp(void* pMeta, char* dst); int32_t streamBackendAddInUseChkp(void* arg, int64_t chkpId); int32_t streamBackendDelInUseChkp(void* arg, int64_t chkpId); -int32_t taskDbBuildSnap(void* arg, int64_t chkpId); +int32_t taskDbBuildSnap(void* arg); // int32_t streamDefaultIter_rocksdb(SStreamState* pState, const void* start, const void* end, SArray* result); #endif \ No newline at end of file diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 78f67d06d5..39c423b7fb 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -450,6 +450,60 @@ int32_t rebuildDirFromCheckpoint(const char* path, int64_t chkpId, char** dst) { return 0; } +int32_t rebuildDirFromChkp2(const char* path, char* key, int64_t chkpId, char** dbPrefixPath, char** dbPath) { + // impl later + int32_t code = 0; + + /*param@1: checkpointId dir + param@2: state + copy pChkpIdDir's file to state dir + opt to set hard link to previous file + */ + + char* prefixPath = taosMemoryCalloc(1, strlen(path) + 128); + sprintf(prefixPath, "%s%s%s", path, TD_DIRSEP, key); + + if (!taosIsDir(prefixPath)) { + code = taosMkDir(prefixPath); + ASSERT(code == 0); + } + + char* defaultPath = taosMemoryCalloc(1, strlen(path) + 256); + sprintf(defaultPath, "%s%s%s", prefixPath, TD_DIRSEP, "defaultPath"); + if (!taosIsDir(defaultPath)) { + taosMulMkDir(defaultPath); + } + + if (chkpId != 0) { + char* chkpPath = taosMemoryCalloc(1, strlen(path) + 256); + sprintf(chkpPath, "%s%s%s%s%" PRId64 "", prefixPath, TD_DIRSEP, "checkpoints", TD_DIRSEP, "checkpoint", chkpId); + if (taosIsDir(chkpPath) && isValidCheckpoint(chkpPath)) { + if (taosIsDir(defaultPath)) { + // remove dir if exists + // taosRenameFile(const char *oldName, const char *newName) + taosRemoveDir(defaultPath); + } + taosMkDir(defaultPath); + code = copyFiles(chkpPath, defaultPath); + if (code != 0) { + qError("failed to restart stream backend from %s, reason: %s", chkpPath, tstrerror(TAOS_SYSTEM_ERROR(errno))); + } else { + qInfo("start to restart stream backend at checkpoint path: %s", chkpPath); + } + + } else { + qError("failed to start stream backend at %s, reason: %s, restart from default defaultPath dir:%s", chkpPath, + tstrerror(TAOS_SYSTEM_ERROR(errno)), defaultPath); + taosMkDir(defaultPath); + } + taosMemoryFree(chkpPath); + } + + *dbPath = defaultPath; + *dbPrefixPath = prefixPath; + + return 0; +} void* streamBackendInit(const char* streamPath, int64_t chkpId) { char* backendPath = NULL; @@ -829,13 +883,13 @@ int32_t streamBackendLoadCheckpointInfo(void* arg) { int32_t chkpGetAllDbCfHandle(SStreamMeta* pMeta, rocksdb_column_family_handle_t*** ppHandle, SArray* refs) { return 0; // SArray* pHandle = taosArrayInit(16, POINTER_BYTES); - // void* pIter = taosHashIterate(pMeta->pTaskBackendUnique, NULL); + // void* pIter = taosHashIterate(pMeta->pTaskDbUnique, NULL); // while (pIter) { // int64_t id = *(int64_t*)pIter; // SBackendCfWrapper* wrapper = taosAcquireRef(streamBackendCfWrapperId, id); // if (wrapper == NULL) { - // pIter = taosHashIterate(pMeta->pTaskBackendUnique, pIter); + // pIter = taosHashIterate(pMeta->pTaskDbUnique, pIter); // continue; // } @@ -940,7 +994,7 @@ int32_t chkpPreBuildDir(char* path, int64_t chkpId, char** chkpDir, char** chkpI return code; } - sprintf(pChkpIdDir, "%s%scheckpoint%" PRId64, pChkpDir, TD_DIRSEP, chkpId); + sprintf(pChkpIdDir, "%s%s%s%" PRId64, pChkpDir, TD_DIRSEP, "checkpoint", chkpId); if (taosIsDir(pChkpIdDir)) { qInfo("stream rm exist checkpoint%s", pChkpIdDir); taosRemoveFile(pChkpIdDir); @@ -950,19 +1004,19 @@ int32_t chkpPreBuildDir(char* path, int64_t chkpId, char** chkpDir, char** chkpI return 0; } -int32_t taskDbBuildSnap(void* arg, int64_t chkpId) { +int32_t taskDbBuildSnap(void* arg) { SStreamMeta* pMeta = arg; - void* pIter = taosHashIterate(pMeta->pTaskBackendUnique, NULL); + void* pIter = taosHashIterate(pMeta->pTaskDbUnique, NULL); int32_t code = 0; while (pIter) { STaskDbWrapper* pTaskDb = *(STaskDbWrapper**)pIter; taskDbAddRef(pTaskDb); - code = taskDbDoCheckpoint((STaskDbWrapper*)pTaskDb, chkpId); + code = taskDbDoCheckpoint(pTaskDb, pTaskDb->chkpId); taskDbRemoveRef(pTaskDb); - pIter = taosHashIterate(pMeta->pTaskBackendUnique, pIter); + pIter = taosHashIterate(pMeta->pTaskDbUnique, pIter); } return 0; } @@ -1070,6 +1124,8 @@ int32_t taskDbDoCheckpoint(void* arg, int64_t chkpId) { code = chkpMayDelObsolete(pTaskDb, chkpId, pChkpDir); + pTaskDb->chkpId = chkpId; + _EXIT: taosMemoryFree(pChkpDir); taosMemoryFree(pChkpIdDir); @@ -1692,14 +1748,20 @@ int32_t taskDbBuildFullPath(char* path, char* key, char** dbFullPath, char** sta *stateFullPath = statePath; return 0; } -STaskDbWrapper* taskDbOpen(char* path, char* key) { +void taskDbUpdateChkpId(void* pTaskDb, int64_t chkpId) { + STaskDbWrapper* p = pTaskDb; + taosThreadMutexLock(&p->mutex); + p->chkpId = chkpId; + taosThreadMutexUnlock(&p->mutex); +} +STaskDbWrapper* taskDbOpen(char* path, char* key, int64_t chkpId) { char* statePath = NULL; char* err = NULL; char* dbPath = NULL; char** cfNames = NULL; size_t nCf = 0; - if (taskDbBuildFullPath(path, key, &dbPath, &statePath) != 0) { + if (rebuildDirFromChkp2(path, key, chkpId, &statePath, &dbPath) != 0) { return NULL; } @@ -1861,12 +1923,12 @@ _EXIT: return code; } -int32_t streamStateConvertDataFormat(char* path, char* key, void* pCfInst) { +int32_t streamStateCvtDataFormat(char* path, char* key, void* pCfInst) { int nCf = sizeof(ginitDict) / sizeof(ginitDict[0]); int32_t code = 0; - STaskDbWrapper* pTaskDb = taskDbOpen(path, key); + STaskDbWrapper* pTaskDb = taskDbOpen(path, key, 0); RocksdbCfInst* pSrcBackend = pCfInst; for (int i = 0; i < nCf; i++) { diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 9d76951adb..016f446fe9 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -182,7 +182,7 @@ int32_t streamMetaCheckBackendCompatible(SStreamMeta* pMeta) { return ret; } -int32_t streamMetaConvertBackendFormat(SStreamMeta* pMeta) { +int32_t streamMetaCvtDbFormat(SStreamMeta* pMeta) { int32_t code = 0; int64_t chkpId = streamGetLatestCheckpointId(pMeta); SBackendWrapper* pBackend = streamBackendInit(pMeta->path, chkpId); @@ -190,7 +190,7 @@ int32_t streamMetaConvertBackendFormat(SStreamMeta* pMeta) { void* pIter = taosHashIterate(pBackend->cfInst, NULL); while (pIter) { void* key = taosHashGetKey(pIter, NULL); - code = streamStateConvertDataFormat(pMeta->path, key, *(void**)pIter); + code = streamStateCvtDataFormat(pMeta->path, key, *(void**)pIter); if (code != 0) { qError("failed to cvt data"); goto _EXIT; @@ -210,7 +210,7 @@ int32_t streamMetaMayCvtDbFormat(SStreamMeta* pMeta) { } else if (compatible == STREAM_STATA_NEED_CONVERT) { qInfo("stream state need covert backend format"); - return streamMetaConvertBackendFormat(pMeta); + return streamMetaCvtDbFormat(pMeta); } else if (compatible == STREAM_STATA_NO_COMPATIBLE) { qError( "stream read incompatible data, rm %s/vnode/vnode*/tq/stream if taosd cannot start, and rebuild stream " @@ -222,9 +222,9 @@ int32_t streamMetaMayCvtDbFormat(SStreamMeta* pMeta) { return 0; } -void* streamMetaGetBackendByTaskKey(SStreamMeta* pMeta, char* key, int64_t* ref) { +void* streamMetaGetBackendByTaskKey(SStreamMeta* pMeta, char* key, int64_t chkpId, int64_t* ref) { taosThreadMutexLock(&pMeta->backendMutex); - void** ppBackend = taosHashGet(pMeta->pTaskBackendUnique, key, strlen(key)); + void** ppBackend = taosHashGet(pMeta->pTaskDbUnique, key, strlen(key)); if (ppBackend != NULL && *ppBackend != NULL) { taskDbAddRef(*ppBackend); *ref = ((STaskDbWrapper*)*ppBackend)->refId; @@ -232,7 +232,7 @@ void* streamMetaGetBackendByTaskKey(SStreamMeta* pMeta, char* key, int64_t* ref) return *ppBackend; } - void* pBackend = taskDbOpen(pMeta->path, key); + void* pBackend = taskDbOpen(pMeta->path, key, chkpId); if (pBackend == NULL) { taosThreadMutexUnlock(&pMeta->backendMutex); return NULL; @@ -240,7 +240,7 @@ void* streamMetaGetBackendByTaskKey(SStreamMeta* pMeta, char* key, int64_t* ref) *ref = taosAddRef(taskDbWrapperId, pBackend); - taosHashPut(pMeta->pTaskBackendUnique, key, strlen(key), &pBackend, sizeof(void*)); + taosHashPut(pMeta->pTaskDbUnique, key, strlen(key), &pBackend, sizeof(void*)); taosThreadMutexUnlock(&pMeta->backendMutex); return pBackend; } @@ -301,8 +301,7 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF pMeta->hbInfo.tickCounter = 0; pMeta->hbInfo.stopFlag = 0; - pMeta->pTaskBackendUnique = - taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); + pMeta->pTaskDbUnique = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); // start backend // taosInitRWLatch(&pMeta->chkpDirLock); @@ -411,7 +410,7 @@ void streamMetaClear(SStreamMeta* pMeta) { taosRemoveRef(streamBackendId, pMeta->streamBackendRid); taosHashClear(pMeta->pTasks); - taosHashClear(pMeta->pTaskBackendUnique); + taosHashClear(pMeta->pTaskDbUnique); taosArrayClear(pMeta->pTaskList); taosArrayClear(pMeta->chkpSaved); @@ -452,7 +451,7 @@ void streamMetaCloseImpl(void* arg) { taosArrayDestroy(pMeta->chkpInUse); taosHashCleanup(pMeta->pTasks); - taosHashCleanup(pMeta->pTaskBackendUnique); + taosHashCleanup(pMeta->pTaskDbUnique); taosMemoryFree(pMeta->path); taosThreadMutexDestroy(&pMeta->backendMutex); diff --git a/source/libs/stream/src/streamSnapshot.c b/source/libs/stream/src/streamSnapshot.c index 112589613f..676effe778 100644 --- a/source/libs/stream/src/streamSnapshot.c +++ b/source/libs/stream/src/streamSnapshot.c @@ -110,7 +110,7 @@ const char* ROCKSDB_CURRENT = "CURRENT"; const char* ROCKSDB_CHECKPOINT_META = "CHECKPOINT"; static int64_t kBlockSize = 64 * 1024; -int32_t streamSnapHandleInit(SStreamSnapHandle* handle, char* path, int64_t chkpId, void* pMeta); +int32_t streamSnapHandleInit(SStreamSnapHandle* handle, char* path, void* pMeta); void streamSnapHandleDestroy(SStreamSnapHandle* handle); // static void streamBuildFname(char* path, char* file, char* fullname) @@ -139,7 +139,7 @@ TdFilePtr streamOpenFile(char* path, char* name, int32_t opt) { return taosOpenFile(fullname, opt); } -int32_t streamBackendGetSnapInfo(void* arg, char* path, int64_t chkpId) { return taskDbBuildSnap(arg, chkpId); } +int32_t streamTaskDbGetSnapInfo(void* arg, char* path) { return taskDbBuildSnap(arg); } void snapFileDebugInfo(SBackendSnapFile2* pSnapFile) { if (qDebugFlag & DEBUG_DEBUG) { @@ -235,20 +235,23 @@ int32_t snapFileReadMeta(SBackendSnapFile2* pSnapFile) { taosCloseDir(&pDir); return 0; } -int32_t streamBackendSnapInitFile(char* path, SStreamTaskSnap* pSnap, SBackendSnapFile2* pSnapFile) { +int32_t streamBackendSnapInitFile(char* metaPath, SStreamTaskSnap* pSnap, SBackendSnapFile2* pSnapFile) { // SBanckendFile* pFile = taosMemoryCalloc(1, sizeof(SBanckendFile)); int32_t code = -1; - char* snapPath = taosMemoryCalloc(1, strlen(path) + 256); - sprintf(snapPath, "%s%s%" PRId64 "_%" PRId64 "%s%s%s%s%scheckpoint%" PRId64 "", path, TD_DIRSEP, pSnap->streamId, - pSnap->taskId, TD_DIRSEP, "state", TD_DIRSEP, "checkpoints", TD_DIRSEP, pSnap->chkpId); - if (taosIsDir(snapPath)) { + char* path = taosMemoryCalloc(1, strlen(metaPath) + 256); + char idstr[64] = {0}; + sprintf(idstr, "0x%" PRIx64 "-0x%x", pSnap->streamId, (int32_t)(pSnap->taskId)); + + sprintf(path, "%s%s%s%s%s%s%s%" PRId64 "", metaPath, TD_DIRSEP, idstr, TD_DIRSEP, "checkpoints", TD_DIRSEP, + "checkpoint", pSnap->chkpId); + if (taosIsDir(path)) { goto _ERROR; } pSnapFile->pSst = taosArrayInit(16, sizeof(void*)); pSnapFile->pFileList = taosArrayInit(64, sizeof(SBackendFileItem)); - pSnapFile->path = snapPath; + pSnapFile->path = path; pSnapFile->snapInfo = *pSnap; if ((code = snapFileReadMeta(pSnapFile)) != 0) { goto _ERROR; @@ -262,7 +265,7 @@ int32_t streamBackendSnapInitFile(char* path, SStreamTaskSnap* pSnap, SBackendSn code = 0; _ERROR: - taosMemoryFree(snapPath); + taosMemoryFree(path); return code; } void snapFileDestroy(SBackendSnapFile2* pSnap) { @@ -288,11 +291,11 @@ void snapFileDestroy(SBackendSnapFile2* pSnap) { return; } -int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path, int64_t chkpId, void* pMeta) { +int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path, void* pMeta) { // impl later SArray* pSnapSet = NULL; - int32_t code = streamBackendGetSnapInfo(pMeta, path, chkpId); + int32_t code = streamTaskDbGetSnapInfo(pMeta, path); if (code != 0) { return -1; } @@ -339,7 +342,7 @@ int32_t streamSnapReaderOpen(void* pMeta, int64_t sver, int64_t chkpId, char* pa return TSDB_CODE_OUT_OF_MEMORY; } - if (streamSnapHandleInit(&pReader->handle, (char*)path, chkpId, pMeta) < 0) { + if (streamSnapHandleInit(&pReader->handle, (char*)path, pMeta) < 0) { taosMemoryFree(pReader); return -1; } @@ -531,9 +534,16 @@ int32_t streamSnapWrite(SStreamSnapWriter* pWriter, uint8_t* pData, uint32_t nDa SBackendSnapFile2* pDbSnapFile = taosArrayGet(pHandle->pDbSnapSet, pHandle->currIdx); if (pDbSnapFile->inited == 0) { + char idstr[64] = {0}; + sprintf(idstr, "0x%" PRIx64 "-0x%x", snapInfo.streamId, (int32_t)(snapInfo.taskId)); + char* path = taosMemoryCalloc(1, strlen(pHandle->metaPath) + 256); - sprintf(path, "%s%s%" PRId64 "_%" PRId64 "%s%s%s%s%scheckpoint%" PRId64 "", path, TD_DIRSEP, snapInfo.streamId, - snapInfo.taskId, TD_DIRSEP, "state", TD_DIRSEP, "checkpoints", TD_DIRSEP, snapInfo.chkpId); + sprintf(path, "%s%s%s%s%s%s%s%" PRId64 "", path, TD_DIRSEP, idstr, TD_DIRSEP, "checkpoints", TD_DIRSEP, + "checkpoint", snapInfo.chkpId); + if (!taosIsDir(path)) { + code = taosMulMkDir(path); + ASSERT(code == 0); + } pDbSnapFile->path = path; pDbSnapFile->snapInfo = snapInfo; diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index d4ac1838d0..b8158c774e 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -116,7 +116,7 @@ SStreamState* streamStateOpen(char* path, void* pTask, bool specPath, int32_t sz // taosWLockLatch(&pMeta->lock); // taosThreadMutexLock(&pMeta->backendMutex); // void* uniqueId = - // taosHashGet(pMeta->pTaskBackendUnique, pState->pTdbState->idstr, strlen(pState->pTdbState->idstr) + 1); + // taosHashGet(pMeta->pTaskDbUnique, pState->pTdbState->idstr, strlen(pState->pTdbState->idstr) + 1); // if (uniqueId == NULL) { // int code = streamStateOpenBackend(pMeta->streamBackend, pState); // if (code == -1) { @@ -124,7 +124,7 @@ SStreamState* streamStateOpen(char* path, void* pTask, bool specPath, int32_t sz // taosMemoryFree(pState); // return NULL; // } - // taosHashPut(pMeta->pTaskBackendUnique, pState->pTdbState->idstr, strlen(pState->pTdbState->idstr) + 1, + // taosHashPut(pMeta->pTaskDbUnique, pState->pTdbState->idstr, strlen(pState->pTdbState->idstr) + 1, // &pState->pTdbState->backendCfWrapperId, sizeof(pState->pTdbState->backendCfWrapperId)); // } else { // int64_t id = *(int64_t*)uniqueId; diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 5b4b0c309d..8da515a650 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -392,8 +392,6 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i taosThreadMutexInit(&pTask->lock, NULL); streamTaskOpenAllUpstreamInput(pTask); - // pTask->pBackend = taskDbOpen(pMeta->path, (char*)pTask->id.idStr); - return TSDB_CODE_SUCCESS; }