diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index efc7901d4a..51a31f72ed 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -369,7 +369,7 @@ typedef struct SStreamMeta { int32_t chkptNotReadyTasks; - int64_t checkpointId; + int64_t checkpointId; SArray* checkpointSaved; SArray* checkpointInUse; int32_t checkpointCap; @@ -601,6 +601,7 @@ int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, int32_t childId); void streamMetaInit(); void streamMetaCleanup(); SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId); + void streamMetaClose(SStreamMeta* streamMeta); int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask); int32_t streamMetaAddDeployedTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask); @@ -610,6 +611,8 @@ SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int32_t taskId); void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask); void streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId); +int32_t streamStateRebuild(SStreamMeta* pMeta, char* path, int64_t chkpId); + int32_t streamMetaBegin(SStreamMeta* pMeta); int32_t streamMetaCommit(SStreamMeta* pMeta); int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver); diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index d0300c622f..881bdef152 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -33,6 +33,7 @@ #define MND_STREAM_MAX_NUM 60 +#define MND_STREAM_CHECKPOINT_NAME "stream-checkpoint" static int32_t mndStreamActionInsert(SSdb *pSdb, SStreamObj *pStream); static int32_t mndStreamActionDelete(SSdb *pSdb, SStreamObj *pStream); static int32_t mndStreamActionUpdate(SSdb *pSdb, SStreamObj *pOldStream, SStreamObj *pNewStream); @@ -1129,7 +1130,6 @@ static int32_t mndAddStreamCheckpointToTrans(STrans *pTrans, SStreamObj *pStream } } - pStream->checkpointFreq = checkpointId; pStream->checkpointId = checkpointId; pStream->checkpointFreq = taosGetTimestampMs(); atomic_store_64(&pStream->currentTick, 0); @@ -1165,7 +1165,7 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) { SMStreamDoCheckpointMsg *pMsg = (SMStreamDoCheckpointMsg *)pReq->pCont; int64_t checkpointId = pMsg->checkpointId; - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, NULL, "stream-checkpoint"); + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, NULL, MND_STREAM_CHECKPOINT_NAME); if (pTrans == NULL) { mError("failed to trigger checkpoint, reason: %s", tstrerror(TSDB_CODE_OUT_OF_MEMORY)); return -1; diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 2f37048d5c..cb7af681ee 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -93,7 +93,7 @@ typedef struct SQueryNode SQueryNode; #define VNODE_BUFPOOL_SEGMENTS 3 -#define VND_INFO_FNAME "vnode.json" +#define VND_INFO_FNAME "vnode.json" #define VND_INFO_FNAME_TMP "vnode_tmp.json" // vnd.h @@ -181,8 +181,8 @@ SArray* metaGetSmaTbUids(SMeta* pMeta); void* metaGetIdx(SMeta* pMeta); void* metaGetIvtIdx(SMeta* pMeta); -int64_t metaGetTbNum(SMeta *pMeta); -void metaReaderDoInit(SMetaReader *pReader, SMeta *pMeta, int32_t flags); +int64_t metaGetTbNum(SMeta* pMeta); +void metaReaderDoInit(SMetaReader* pReader, SMeta* pMeta, int32_t flags); int32_t metaCreateTSma(SMeta* pMeta, int64_t version, SSmaCfg* pCfg); int32_t metaDropTSma(SMeta* pMeta, int64_t indexUid); @@ -329,6 +329,8 @@ int32_t streamStateSnapRead(SStreamStateReader* pReader, uint8_t** ppData); int32_t streamStateSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamStateWriter** ppWriter); int32_t streamStateSnapWriterClose(SStreamStateWriter* pWriter, int8_t rollback); int32_t streamStateSnapWrite(SStreamStateWriter* pWriter, uint8_t* pData, uint32_t nData); +int32_t streamStateRebuildFromSnap(SStreamStateWriter* pWriter, char* path, int64_t chkpId); + // SStreamTaskReader ====================================== // SStreamStateWriter ===================================== // SStreamStateReader ===================================== diff --git a/source/dnode/vnode/src/tq/tqStreamStateSnap.c b/source/dnode/vnode/src/tq/tqStreamStateSnap.c index e0d58176ed..3478928c4f 100644 --- a/source/dnode/vnode/src/tq/tqStreamStateSnap.c +++ b/source/dnode/vnode/src/tq/tqStreamStateSnap.c @@ -135,14 +135,18 @@ int32_t streamStateSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamS pWriter->sver = sver; pWriter->ever = ever; - sprintf(tdir, "%s%s%s", pTq->path, TD_DIRSEP, VNODE_TQ_STREAM); + sprintf(tdir, "%s%s%s%s%s", pTq->path, TD_DIRSEP, VNODE_TQ_STREAM, TD_DIRSEP, "received"); + taosMkDir(tdir); + SStreamSnapWriter* pSnapWriter = NULL; if (streamSnapWriterOpen(pTq, sver, ever, tdir, &pSnapWriter) < 0) { goto _err; } - tqDebug("vgId:%d, vnode stream-state snapshot writer opened", TD_VID(pTq->pVnode)); + tqDebug("vgId:%d, vnode stream-state snapshot writer opened, path:%s", TD_VID(pTq->pVnode), tdir); pWriter->pWriterImpl = pSnapWriter; + + *ppWriter = pWriter; return code; _err: tqError("vgId:%d, vnode stream-state snapshot writer failed to open since %s", TD_VID(pTq->pVnode), tstrerror(code)); @@ -158,6 +162,9 @@ int32_t streamStateSnapWriterClose(SStreamStateWriter* pWriter, int8_t rollback) taosMemoryFree(pWriter); return code; } +int32_t streamStateRebuildFromSnap(SStreamStateWriter* pWriter, char* path, int64_t chkpId) { + return streamStateRebuild(pWriter->pTq->pStreamMeta, path, chkpId); +} int32_t streamStateSnapWrite(SStreamStateWriter* pWriter, uint8_t* pData, uint32_t nData) { int32_t code = 0; diff --git a/source/dnode/vnode/src/vnd/vnodeSnapshot.c b/source/dnode/vnode/src/vnd/vnodeSnapshot.c index 496a151c6e..16bd233807 100644 --- a/source/dnode/vnode/src/vnd/vnodeSnapshot.c +++ b/source/dnode/vnode/src/vnd/vnodeSnapshot.c @@ -517,7 +517,7 @@ int32_t vnodeSnapWrite(SVSnapWriter *pWriter, uint8_t *pData, uint32_t nData) { code = streamTaskSnapWrite(pWriter->pStreamTaskWriter, pData, nData); if (code) goto _err; } break; - case SNAP_DATA_STREAM_STATE: { + case SNAP_DATA_STREAM_STATE_BACKEND: { if (pWriter->pStreamStateWriter == NULL) { code = streamStateSnapWriterOpen(pVnode->pTq, pWriter->sver, pWriter->ever, &pWriter->pStreamStateWriter); if (code) goto _err; diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 6cd81058da..c5f9f9db35 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -158,7 +158,6 @@ int32_t copyFiles(const char* src, const char* dst) { if (pDir == NULL) return 0; TdDirEntryPtr de = NULL; - while ((de = taosReadDir(pDir)) != NULL) { char* name = taosGetDirEntryName(de); if (strcmp(name, ".") == 0 || strcmp(name, "..") == 0) continue; @@ -186,7 +185,11 @@ int32_t rebuildDirFromCheckpoint(const char* path, int64_t chkpId, char** dst) { // impl later int32_t code = 0; - // chkpId = 0; + /*param@1: checkpointId dir + param@2: state + copy checkpointdir's file to state dir + opt to set hard link to previous file + */ char* state = taosMemoryCalloc(1, strlen(path) + 32); sprintf(state, "%s%s%s", path, TD_DIRSEP, "state"); if (chkpId != 0) { @@ -1471,10 +1474,11 @@ int32_t streamStateClear_rocksdb(SStreamState* pState) { qDebug("streamStateClear_rocksdb"); SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper; - char sKeyStr[128] = {0}; - char eKeyStr[128] = {0}; - SStateKey sKey = {.key = {.ts = 0, .groupId = 0}, .opNum = pState->number}; - SStateKey eKey = {.key = {.ts = INT64_MAX, .groupId = UINT64_MAX}, .opNum = pState->number}; + + char sKeyStr[128] = {0}; + char eKeyStr[128] = {0}; + SStateKey sKey = {.key = {.ts = 0, .groupId = 0}, .opNum = pState->number}; + SStateKey eKey = {.key = {.ts = INT64_MAX, .groupId = UINT64_MAX}, .opNum = pState->number}; int sLen = stateKeyEncode(&sKey, sKeyStr); int eLen = stateKeyEncode(&eKey, eKeyStr); @@ -1509,9 +1513,9 @@ int32_t streamStateGetFirst_rocksdb(SStreamState* pState, SWinKey* key) { qDebug("streamStateGetFirst_rocksdb"); SWinKey tmp = {.ts = 0, .groupId = 0}; streamStatePut_rocksdb(pState, &tmp, NULL, 0); - SStreamStateCur* pCur = streamStateSeekKeyNext_rocksdb(pState, &tmp); - int32_t code = streamStateGetKVByCur_rocksdb(pCur, key, NULL, 0); + SStreamStateCur* pCur = streamStateSeekKeyNext_rocksdb(pState, &tmp); + int32_t code = streamStateGetKVByCur_rocksdb(pCur, key, NULL, 0); streamStateFreeCur(pCur); streamStateDel_rocksdb(pState, &tmp); return code; @@ -1525,10 +1529,8 @@ int32_t streamStateGetGroupKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, uint64_t groupId = pKey->groupId; int32_t code = streamStateFillGetKVByCur_rocksdb(pCur, pKey, pVal, pVLen); - if (code == 0) { - if (pKey->groupId == groupId) { - return 0; - } + if (code == 0 && pKey->groupId == groupId) { + return 0; } return -1; } @@ -1630,6 +1632,9 @@ SStreamStateCur* streamStateSeekToLast_rocksdb(SStreamState* pState, const SWinK const SStateKey maxStateKey = {.key = {.groupId = UINT64_MAX, .ts = INT64_MAX}, .opNum = INT64_MAX}; STREAM_STATE_PUT_ROCKSDB(pState, "state", &maxStateKey, "", 0); + if (code != 0) { + return NULL; + } char buf[128] = {0}; int32_t klen = stateKeyEncode((void*)&maxStateKey, buf); @@ -1666,9 +1671,10 @@ SStreamStateCur* streamStateSeekToLast_rocksdb(SStreamState* pState, const SWinK SStreamStateCur* streamStateGetCur_rocksdb(SStreamState* pState, const SWinKey* key) { qDebug("streamStateGetCur_rocksdb"); SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper; - SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); + SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); if (pCur == NULL) return NULL; + pCur->db = wrapper->rocksdb; pCur->iter = streamStateIterCreate(pState, "state", (rocksdb_snapshot_t**)&pCur->snapshot, (rocksdb_readoptions_t**)&pCur->readOpt); @@ -1717,8 +1723,6 @@ int32_t streamStateSessionPut_rocksdb(SStreamState* pState, const SSessionKey* k int code = 0; SStateSessionKey sKey = {.key = *key, .opNum = pState->number}; STREAM_STATE_PUT_ROCKSDB(pState, "sess", &sKey, value, vLen); - if (code == -1) { - } return code; } int32_t streamStateSessionGet_rocksdb(SStreamState* pState, SSessionKey* key, void** pVal, int32_t* pVLen) { @@ -1728,21 +1732,21 @@ int32_t streamStateSessionGet_rocksdb(SStreamState* pState, SSessionKey* key, vo SSessionKey resKey = *key; void* tmp = NULL; int32_t vLen = 0; - code = streamStateSessionGetKVByCur_rocksdb(pCur, &resKey, &tmp, &vLen); - if (code == 0) { - if (pVLen != NULL) *pVLen = vLen; - if (key->win.skey != resKey.win.skey) { - code = -1; - } else { - *key = resKey; - *pVal = taosMemoryCalloc(1, *pVLen); - memcpy(*pVal, tmp, *pVLen); - } + code = streamStateSessionGetKVByCur_rocksdb(pCur, &resKey, &tmp, &vLen); + if (code == 0 && key->win.skey != resKey.win.skey) { + *key = resKey; + + if (pVal) { + *pVal = tmp; + tmp = NULL; + }; + if (pVLen) *pVLen = vLen; + } else { + code = -1; } + taosMemoryFree(tmp); - streamStateFreeCur(pCur); - // impl later return code; } @@ -1788,8 +1792,6 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentPrev_rocksdb(SStreamState* pSta rocksdb_iter_prev(pCur->iter); if (!rocksdb_iter_valid(pCur->iter)) { - // qWarn("streamState failed to seek key prev - // %s", toString); streamStateFreeCur(pCur); return NULL; } @@ -1807,10 +1809,10 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentNext_rocksdb(SStreamState* pSta (rocksdb_readoptions_t**)&pCur->readOpt); pCur->number = pState->number; - char buf[128] = {0}; - + char buf[128] = {0}; SStateSessionKey sKey = {.key = *key, .opNum = pState->number}; int len = stateSessionKeyEncode(&sKey, buf); + if (!streamStateIterSeekAndValid(pCur->iter, buf, len)) { streamStateFreeCur(pCur); return NULL; @@ -1858,6 +1860,7 @@ SStreamStateCur* streamStateSessionSeekKeyNext_rocksdb(SStreamState* pState, con streamStateFreeCur(pCur); return NULL; } + size_t klen; const char* iKey = rocksdb_iter_key(pCur->iter, &klen); SStateSessionKey curKey = {0}; @@ -2393,8 +2396,8 @@ void streamStateDestroyBatch(void* pBatch) { rocksdb_writebatch_destroy((rock int32_t streamStatePutBatch(SStreamState* pState, const char* cfKeyName, rocksdb_writebatch_t* pBatch, void* key, void* val, int32_t vlen, int64_t ttl) { SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper; - int i = streamStateGetCfIdx(pState, cfKeyName); + int i = streamStateGetCfIdx(pState, cfKeyName); if (i < 0) { qError("streamState failed to put to cf name:%s", cfKeyName); return -1; diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 4711f4af19..3cf967a219 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -36,6 +36,14 @@ void streamMetaCleanup() { taosCloseRef(streamBackendCfWrapperId); } +int32_t streamStateRebuild(SStreamMeta* pMeta, char* path, int64_t chkpId) { + int32_t code = 0; + + int32_t nTask = taosHashGetSize(pMeta->pTasks); + assert(nTask == 0); + + return code; +} SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId) { int32_t code = -1; SStreamMeta* pMeta = taosMemoryCalloc(1, sizeof(SStreamMeta)); @@ -44,22 +52,13 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF return NULL; } - int32_t len = strlen(path) + 20; - char* streamPath = taosMemoryCalloc(1, len); - sprintf(streamPath, "%s/%s", path, "stream"); - pMeta->path = taosStrdup(streamPath); + char* tpath = taosMemoryCalloc(1, strlen(path) + 64); + sprintf(tpath, "%s%s%s", path, TD_DIRSEP, "stream"); + pMeta->path = tpath; + if (tdbOpen(pMeta->path, 16 * 1024, 1, &pMeta->db, 0) < 0) { goto _err; } - - memset(streamPath, 0, len); - sprintf(streamPath, "%s/%s", pMeta->path, "checkpoints"); - code = taosMulModeMkDir(streamPath, 0755); - if (code != 0) { - terrno = TAOS_SYSTEM_ERROR(code); - goto _err; - } - if (tdbTbOpen("task.db", sizeof(int32_t), -1, NULL, pMeta->db, &pMeta->pTaskDb, 0) < 0) { goto _err; } @@ -90,14 +89,6 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF pMeta->ahandle = ahandle; pMeta->expandFunc = expandFunc; - // memset(streamPath, 0, len); - // sprintf(streamPath, "%s/%s", pMeta->path, "state"); - // code = taosMulModeMkDir(streamPath, 0755); - // if (code != 0) { - // terrno = TAOS_SYSTEM_ERROR(code); - // goto _err; - // } - pMeta->pTaskBackendUnique = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); pMeta->checkpointSaved = taosArrayInit(4, sizeof(int64_t)); @@ -119,15 +110,12 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF goto _err; } - taosMemoryFree(streamPath); - taosInitRWLatch(&pMeta->lock); taosThreadMutexInit(&pMeta->backendMutex, NULL); return pMeta; _err: - taosMemoryFree(streamPath); taosMemoryFree(pMeta->path); if (pMeta->pTasks) taosHashCleanup(pMeta->pTasks); if (pMeta->pTaskList) taosArrayDestroy(pMeta->pTaskList); @@ -140,6 +128,49 @@ _err: return NULL; } +void streamMetaReopen(SStreamMeta** ppMeta) { + SStreamMeta* pMeta = *ppMeta; + + SStreamMeta* pNewMeta = taosMemoryCalloc(1, sizeof(SStreamMeta)); + pNewMeta->path = taosStrdup(pMeta->path); + pNewMeta->vgId = pMeta->vgId; + pNewMeta->walScanCounter = 0; + pNewMeta->ahandle = pMeta->ahandle; + pNewMeta->expandFunc = pMeta->expandFunc; + + *ppMeta = pNewMeta; + + streamMetaClose(pMeta); + + // tdbAbort(pMeta->db, pMeta->txn); + // tdbTbClose(pMeta->pTaskDb); + // tdbTbClose(pMeta->pCheckpointDb); + // tdbClose(pMeta->db); + + // void* pIter = NULL; + // while (1) { + // pIter = taosHashIterate(pMeta->pTasks, pIter); + // if (pIter == NULL) { + // break; + // } + + // SStreamTask* pTask = *(SStreamTask**)pIter; + // if (pTask->schedTimer) { + // taosTmrStop(pTask->schedTimer); + // pTask->schedTimer = NULL; + // } + + // if (pTask->launchTaskTimer) { + // taosTmrStop(pTask->launchTaskTimer); + // pTask->launchTaskTimer = NULL; + // } + + // tFreeStreamTask(pTask); + // } + + // taosHashClear(pMeta->pTasks); + // taosRemoveRef(streamBackendId, pMeta->streamBackendRid); +} void streamMetaClose(SStreamMeta* pMeta) { tdbAbort(pMeta->db, pMeta->txn); tdbTbClose(pMeta->pTaskDb); diff --git a/source/libs/stream/src/streamSnapshot.c b/source/libs/stream/src/streamSnapshot.c index 44aa69a070..0bf029f574 100644 --- a/source/libs/stream/src/streamSnapshot.c +++ b/source/libs/stream/src/streamSnapshot.c @@ -89,6 +89,24 @@ void streamSnapHandleDestroy(SStreamSnapHandle* handle); sprintf(fullname, "%s%s%s", path, TD_DIRSEP, file); \ } while (0) +int32_t streamGetFileSize(char* path, char* name, int64_t* sz) { + int ret = 0; + + char* fullname = taosMemoryCalloc(1, strlen(path) + 32); + sprintf(fullname, "%s%s%s", path, TD_DIRSEP, name); + + ret = taosStatFile(fullname, sz, NULL); + taosMemoryFree(fullname); + + return ret; +} + +TdFilePtr streamOpenFile(char* path, char* name, int32_t opt) { + char fullname[256] = {0}; + STREAM_ROCKSDB_BUILD_FULLNAME(path, name, fullname); + return taosOpenFile(fullname, opt); +} + int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path, int64_t chkpId) { // impl later int len = strlen(path); @@ -129,7 +147,7 @@ int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path, int64_t chk continue; } if (strlen(name) >= strlen(ROCKSDB_OPTIONS) && 0 == strncmp(name, ROCKSDB_OPTIONS, strlen(ROCKSDB_OPTIONS))) { - pFile->pMainfest = taosStrdup(name); + pFile->pOptions = taosStrdup(name); continue; } if (strlen(name) >= strlen(ROCKSDB_CHECKPOINT_META) && @@ -157,50 +175,39 @@ int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path, int64_t chk // current item.name = pFile->pCurrent; item.type = ROCKSDB_CURRENT_TYPE; - if (taosStatFile(pFile->pCurrent, &item.size, NULL) != 0) { - qError("stream-state failed to get file size: %s", pFile->pCurrent); - } + streamGetFileSize(pFile->path, item.name, &item.size); taosArrayPush(list, &item); + // mainfest item.name = pFile->pMainfest; item.type = ROCKSDB_MAINFEST_TYPE; - taosStatFile(pFile->pMainfest, &item.size, NULL); + streamGetFileSize(pFile->path, item.name, &item.size); taosArrayPush(list, &item); + // options item.name = pFile->pOptions; item.type = ROCKSDB_OPTIONS_TYPE; - taosStatFile(pFile->pOptions, &item.size, NULL); + streamGetFileSize(pFile->path, item.name, &item.size); taosArrayPush(list, &item); // sst for (int i = 0; i < taosArrayGetSize(pFile->pSst); i++) { char* sst = taosArrayGetP(pFile->pSst, i); item.name = sst; item.type = ROCKSDB_SST_TYPE; - taosStatFile(sst, &item.size, NULL); + streamGetFileSize(pFile->path, item.name, &item.size); taosArrayPush(list, &item); } // meta item.name = pFile->pCheckpointMeta; item.type = ROCKSDB_CHECKPOINT_META_TYPE; - taosStatFile(pFile->pCheckpointMeta, &item.size, NULL); - taosArrayPush(list, &item); + if (streamGetFileSize(pFile->path, item.name, &item.size) == 0) { + taosArrayPush(list, &item); + } pHandle->pBackendFile = pFile; pHandle->currFileIdx = 0; pHandle->pFileList = list; - - char fullname[256] = {0}; - char* file = ((SBackendFileItem*)taosArrayGet(pHandle->pFileList, pHandle->currFileIdx))->name; - STREAM_ROCKSDB_BUILD_FULLNAME(pFile->path, file, fullname); - - pHandle->fd = taosOpenFile(fullname, TD_FILE_READ); - if (pHandle->fd == NULL) { - qError("stream-state failed to open %s, reason: %s", tdir, tstrerror(errno)); - tdir = NULL; - goto _err; - } - qDebug("stream-state open file %s, current offset %" PRId64 "", file, (int64_t)0); pHandle->seraial = 0; pHandle->offset = 0; return 0; @@ -264,29 +271,35 @@ int32_t streamSnapRead(SStreamSnapReader* pReader, uint8_t** ppData, int64_t* si SBackendFileItem* item = taosArrayGet(pHandle->pFileList, pHandle->currFileIdx); - qDebug("stream-state start to read file %s, current offset %" PRId64 ", size : % " PRId64 ", file no. %d", item->name, - (int64_t)pHandle->offset, item->size, pHandle->currFileIdx); + if (pHandle->fd == NULL) { + if (pHandle->currFileIdx >= taosArrayGetSize(pHandle->pFileList)) { + // finish + *ppData = NULL; + *size = 0; + return 0; + } else { + pHandle->fd = streamOpenFile(pFile->path, item->name, TD_FILE_READ); + qDebug("stream-state open file %s, current offset:%" PRId64 ", size:% " PRId64 ", file no.%d", item->name, + (int64_t)pHandle->offset, item->size, pHandle->currFileIdx); + } + } + qDebug("stream-state start to read file %s, current offset:%" PRId64 ", size:%" PRId64 ", file no.%d", item->name, + (int64_t)pHandle->offset, item->size, pHandle->currFileIdx); uint8_t* buf = taosMemoryCalloc(1, sizeof(SStreamSnapBlockHdr) + kBlockSize); int64_t nread = taosPReadFile(pHandle->fd, buf + sizeof(SStreamSnapBlockHdr), kBlockSize, pHandle->offset); if (nread == -1) { code = TAOS_SYSTEM_ERROR(terrno); - qError("stream-state snap failed to read snap, file name:%s, type:%d, reason:%s", item->name, item->type, + qError("stream-state snap failed to read snap, file name:%s, type:%d,reason:%s", item->name, item->type, tstrerror(code)); - qDebug("stream-state failed to read file %s, current offset %" PRId64 ", size : % " PRId64 ", file no. %d", - item->name, (int64_t)pHandle->offset, item->size, pHandle->currFileIdx); - return code; - // handle later return -1; } else if (nread > 0 && nread <= kBlockSize) { // left bytes less than kBlockSize - qDebug("stream-state read file %s, current offset %" PRId64 ", size : % " PRId64 ", file no. %d", item->name, + qDebug("stream-state read file %s, current offset:%" PRId64 ",size:% " PRId64 ", file no.%d", item->name, (int64_t)pHandle->offset, item->size, pHandle->currFileIdx); pHandle->offset += nread; if (pHandle->offset >= item->size || nread < kBlockSize) { taosCloseFile(&pHandle->fd); - qDebug("stream-state close file no.%d, move to next file, next file no.%d", pHandle->currFileIdx, - pHandle->currFileIdx + 1); pHandle->offset = 0; pHandle->currFileIdx += 1; } @@ -303,15 +316,13 @@ int32_t streamSnapRead(SStreamSnapReader* pReader, uint8_t** ppData, int64_t* si return 0; } item = taosArrayGet(pHandle->pFileList, pHandle->currFileIdx); - char fullname[256] = {0}; - STREAM_ROCKSDB_BUILD_FULLNAME(pFile->path, item->name, fullname); - pHandle->fd = taosOpenFile(fullname, TD_FILE_READ); + pHandle->fd = streamOpenFile(pFile->path, item->name, TD_FILE_READ); nread = taosPReadFile(pHandle->fd, buf + sizeof(SStreamSnapBlockHdr), kBlockSize, pHandle->offset); pHandle->offset += nread; - qDebug("read file %s, current offset %" PRId64 ", size : % " PRId64 ", file no. %d", item->name, - (int64_t)pHandle->offset, item->size, pHandle->currFileIdx); + qDebug("stream-state open file and read file %s, current offset:%" PRId64 ", size:% " PRId64 ", file no.%d", + item->name, (int64_t)pHandle->offset, item->size, pHandle->currFileIdx); } SStreamSnapBlockHdr* pHdr = (SStreamSnapBlockHdr*)buf; @@ -349,7 +360,7 @@ int32_t streamSnapWriterOpen(void* pMeta, int64_t sver, int64_t ever, char* path pHandle->pFileList = list; pHandle->currFileIdx = 0; pHandle->offset = 0; - pHandle->fd = taosOpenFile(taosArrayGet(pHandle->pFileList, pHandle->currFileIdx), TD_FILE_WRITE); + *ppWriter = pWriter; return 0; } @@ -360,14 +371,25 @@ int32_t streamSnapWrite(SStreamSnapWriter* pWriter, uint8_t* pData, uint32_t nDa SStreamSnapBlockHdr* pHdr = (SStreamSnapBlockHdr*)pData; SStreamSnapHandle* pHandle = &pWriter->handle; SBanckendFile* pFile = pHandle->pBackendFile; - SBackendFileItem* pItem = taosArrayGetP(pHandle->pFileList, pHandle->currFileIdx); - if (strlen(pHdr->name) == strlen(pItem->name) && strcmp(pHdr->name, pItem->name) == 0) { - if (taosPWriteFile(pHandle->fd, pHdr->data, pHdr->size, pHandle->offset) != pHdr->size) { + SBackendFileItem* pItem = taosArrayGet(pHandle->pFileList, pHandle->currFileIdx); + + if (pHandle->fd == NULL) { + pHandle->fd = streamOpenFile(pFile->path, pItem->name, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND); + if (pHandle->fd == NULL) { code = TAOS_SYSTEM_ERROR(terrno); - qError("stream snap failed to write snap, file name:%s, reason:%s", pHdr->name, tstrerror(code)); + qError("stream-state failed to open file name:%s%s%s, reason:%s", pFile->path, TD_DIRSEP, pHdr->name, + tstrerror(code)); + } + } + + if (strlen(pHdr->name) == strlen(pItem->name) && strcmp(pHdr->name, pItem->name) == 0) { + int64_t bytes = taosPWriteFile(pHandle->fd, pHdr->data, pHdr->size, pHandle->offset); + if (bytes != pHdr->size) { + code = TAOS_SYSTEM_ERROR(terrno); + qError("stream-state failed to write snap, file name:%s, reason:%s", pHdr->name, tstrerror(code)); return code; } - pHandle->offset += pHdr->size; + pHandle->offset += bytes; } else { taosCloseFile(&pHandle->fd); pHandle->offset = 0; @@ -378,10 +400,13 @@ int32_t streamSnapWrite(SStreamSnapWriter* pWriter, uint8_t* pData, uint32_t nDa item.type = pHdr->type; taosArrayPush(pHandle->pFileList, &item); - char fullname[256] = {0}; - char* name = ((SBackendFileItem*)taosArrayGet(pHandle->pFileList, taosArrayGetSize(pHandle->pFileList) - 1))->name; - STREAM_ROCKSDB_BUILD_FULLNAME(pFile->path, name, fullname); - pHandle->fd = taosOpenFile(fullname, TD_FILE_WRITE); + SBackendFileItem* pItem = taosArrayGet(pHandle->pFileList, pHandle->currFileIdx); + pHandle->fd = streamOpenFile(pFile->path, pItem->name, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND); + if (pHandle->fd == NULL) { + code = TAOS_SYSTEM_ERROR(terrno); + qError("stream-state failed to open file name:%s%s%s, reason:%s", pFile->path, TD_DIRSEP, pHdr->name, + tstrerror(code)); + } taosPWriteFile(pHandle->fd, pHdr->data, pHdr->size, pHandle->offset); pHandle->offset += pHdr->size; @@ -406,6 +431,7 @@ int32_t streamSnapWriterClose(SStreamSnapWriter* pWriter, int8_t rollback) { qDebug("stream snap get file list, %s", buf); taosMemoryFree(buf); } + for (int i = 0; i < taosArrayGetSize(handle->pFileList); i++) { SBackendFileItem* item = taosArrayGet(handle->pFileList, i); taosMemoryFree(item->name);