Merge branch 'enh/chkpTransfer' of https://github.com/taosdata/TDengine into enh/chkpTransfer

This commit is contained in:
yihaoDeng 2023-08-10 14:57:12 +08:00
commit ea00f676fc
8 changed files with 183 additions and 111 deletions

View File

@ -369,7 +369,7 @@ typedef struct SStreamMeta {
int32_t chkptNotReadyTasks;
int64_t checkpointId;
int64_t checkpointId;
SArray* checkpointSaved;
SArray* checkpointInUse;
int32_t checkpointCap;
@ -601,6 +601,7 @@ int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, int32_t childId);
void streamMetaInit();
void streamMetaCleanup();
SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId);
void streamMetaClose(SStreamMeta* streamMeta);
int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask);
int32_t streamMetaAddDeployedTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask);
@ -610,6 +611,8 @@ SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int32_t taskId);
void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask);
void streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId);
int32_t streamStateRebuild(SStreamMeta* pMeta, char* path, int64_t chkpId);
int32_t streamMetaBegin(SStreamMeta* pMeta);
int32_t streamMetaCommit(SStreamMeta* pMeta);
int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver);

View File

@ -33,6 +33,7 @@
#define MND_STREAM_MAX_NUM 60
#define MND_STREAM_CHECKPOINT_NAME "stream-checkpoint"
static int32_t mndStreamActionInsert(SSdb *pSdb, SStreamObj *pStream);
static int32_t mndStreamActionDelete(SSdb *pSdb, SStreamObj *pStream);
static int32_t mndStreamActionUpdate(SSdb *pSdb, SStreamObj *pOldStream, SStreamObj *pNewStream);
@ -1129,7 +1130,6 @@ static int32_t mndAddStreamCheckpointToTrans(STrans *pTrans, SStreamObj *pStream
}
}
pStream->checkpointFreq = checkpointId;
pStream->checkpointId = checkpointId;
pStream->checkpointFreq = taosGetTimestampMs();
atomic_store_64(&pStream->currentTick, 0);
@ -1165,7 +1165,7 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) {
SMStreamDoCheckpointMsg *pMsg = (SMStreamDoCheckpointMsg *)pReq->pCont;
int64_t checkpointId = pMsg->checkpointId;
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, NULL, "stream-checkpoint");
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, NULL, MND_STREAM_CHECKPOINT_NAME);
if (pTrans == NULL) {
mError("failed to trigger checkpoint, reason: %s", tstrerror(TSDB_CODE_OUT_OF_MEMORY));
return -1;

View File

@ -93,7 +93,7 @@ typedef struct SQueryNode SQueryNode;
#define VNODE_BUFPOOL_SEGMENTS 3
#define VND_INFO_FNAME "vnode.json"
#define VND_INFO_FNAME "vnode.json"
#define VND_INFO_FNAME_TMP "vnode_tmp.json"
// vnd.h
@ -181,8 +181,8 @@ SArray* metaGetSmaTbUids(SMeta* pMeta);
void* metaGetIdx(SMeta* pMeta);
void* metaGetIvtIdx(SMeta* pMeta);
int64_t metaGetTbNum(SMeta *pMeta);
void metaReaderDoInit(SMetaReader *pReader, SMeta *pMeta, int32_t flags);
int64_t metaGetTbNum(SMeta* pMeta);
void metaReaderDoInit(SMetaReader* pReader, SMeta* pMeta, int32_t flags);
int32_t metaCreateTSma(SMeta* pMeta, int64_t version, SSmaCfg* pCfg);
int32_t metaDropTSma(SMeta* pMeta, int64_t indexUid);
@ -329,6 +329,8 @@ int32_t streamStateSnapRead(SStreamStateReader* pReader, uint8_t** ppData);
int32_t streamStateSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamStateWriter** ppWriter);
int32_t streamStateSnapWriterClose(SStreamStateWriter* pWriter, int8_t rollback);
int32_t streamStateSnapWrite(SStreamStateWriter* pWriter, uint8_t* pData, uint32_t nData);
int32_t streamStateRebuildFromSnap(SStreamStateWriter* pWriter, char* path, int64_t chkpId);
// SStreamTaskReader ======================================
// SStreamStateWriter =====================================
// SStreamStateReader =====================================

View File

@ -135,14 +135,18 @@ int32_t streamStateSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamS
pWriter->sver = sver;
pWriter->ever = ever;
sprintf(tdir, "%s%s%s", pTq->path, TD_DIRSEP, VNODE_TQ_STREAM);
sprintf(tdir, "%s%s%s%s%s", pTq->path, TD_DIRSEP, VNODE_TQ_STREAM, TD_DIRSEP, "received");
taosMkDir(tdir);
SStreamSnapWriter* pSnapWriter = NULL;
if (streamSnapWriterOpen(pTq, sver, ever, tdir, &pSnapWriter) < 0) {
goto _err;
}
tqDebug("vgId:%d, vnode stream-state snapshot writer opened", TD_VID(pTq->pVnode));
tqDebug("vgId:%d, vnode stream-state snapshot writer opened, path:%s", TD_VID(pTq->pVnode), tdir);
pWriter->pWriterImpl = pSnapWriter;
*ppWriter = pWriter;
return code;
_err:
tqError("vgId:%d, vnode stream-state snapshot writer failed to open since %s", TD_VID(pTq->pVnode), tstrerror(code));
@ -158,6 +162,9 @@ int32_t streamStateSnapWriterClose(SStreamStateWriter* pWriter, int8_t rollback)
taosMemoryFree(pWriter);
return code;
}
int32_t streamStateRebuildFromSnap(SStreamStateWriter* pWriter, char* path, int64_t chkpId) {
return streamStateRebuild(pWriter->pTq->pStreamMeta, path, chkpId);
}
int32_t streamStateSnapWrite(SStreamStateWriter* pWriter, uint8_t* pData, uint32_t nData) {
int32_t code = 0;

View File

@ -517,7 +517,7 @@ int32_t vnodeSnapWrite(SVSnapWriter *pWriter, uint8_t *pData, uint32_t nData) {
code = streamTaskSnapWrite(pWriter->pStreamTaskWriter, pData, nData);
if (code) goto _err;
} break;
case SNAP_DATA_STREAM_STATE: {
case SNAP_DATA_STREAM_STATE_BACKEND: {
if (pWriter->pStreamStateWriter == NULL) {
code = streamStateSnapWriterOpen(pVnode->pTq, pWriter->sver, pWriter->ever, &pWriter->pStreamStateWriter);
if (code) goto _err;

View File

@ -158,7 +158,6 @@ int32_t copyFiles(const char* src, const char* dst) {
if (pDir == NULL) return 0;
TdDirEntryPtr de = NULL;
while ((de = taosReadDir(pDir)) != NULL) {
char* name = taosGetDirEntryName(de);
if (strcmp(name, ".") == 0 || strcmp(name, "..") == 0) continue;
@ -186,7 +185,11 @@ int32_t rebuildDirFromCheckpoint(const char* path, int64_t chkpId, char** dst) {
// impl later
int32_t code = 0;
// chkpId = 0;
/*param@1: checkpointId dir
param@2: state
copy checkpointdir's file to state dir
opt to set hard link to previous file
*/
char* state = taosMemoryCalloc(1, strlen(path) + 32);
sprintf(state, "%s%s%s", path, TD_DIRSEP, "state");
if (chkpId != 0) {
@ -1471,10 +1474,11 @@ int32_t streamStateClear_rocksdb(SStreamState* pState) {
qDebug("streamStateClear_rocksdb");
SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper;
char sKeyStr[128] = {0};
char eKeyStr[128] = {0};
SStateKey sKey = {.key = {.ts = 0, .groupId = 0}, .opNum = pState->number};
SStateKey eKey = {.key = {.ts = INT64_MAX, .groupId = UINT64_MAX}, .opNum = pState->number};
char sKeyStr[128] = {0};
char eKeyStr[128] = {0};
SStateKey sKey = {.key = {.ts = 0, .groupId = 0}, .opNum = pState->number};
SStateKey eKey = {.key = {.ts = INT64_MAX, .groupId = UINT64_MAX}, .opNum = pState->number};
int sLen = stateKeyEncode(&sKey, sKeyStr);
int eLen = stateKeyEncode(&eKey, eKeyStr);
@ -1509,9 +1513,9 @@ int32_t streamStateGetFirst_rocksdb(SStreamState* pState, SWinKey* key) {
qDebug("streamStateGetFirst_rocksdb");
SWinKey tmp = {.ts = 0, .groupId = 0};
streamStatePut_rocksdb(pState, &tmp, NULL, 0);
SStreamStateCur* pCur = streamStateSeekKeyNext_rocksdb(pState, &tmp);
int32_t code = streamStateGetKVByCur_rocksdb(pCur, key, NULL, 0);
SStreamStateCur* pCur = streamStateSeekKeyNext_rocksdb(pState, &tmp);
int32_t code = streamStateGetKVByCur_rocksdb(pCur, key, NULL, 0);
streamStateFreeCur(pCur);
streamStateDel_rocksdb(pState, &tmp);
return code;
@ -1525,10 +1529,8 @@ int32_t streamStateGetGroupKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey,
uint64_t groupId = pKey->groupId;
int32_t code = streamStateFillGetKVByCur_rocksdb(pCur, pKey, pVal, pVLen);
if (code == 0) {
if (pKey->groupId == groupId) {
return 0;
}
if (code == 0 && pKey->groupId == groupId) {
return 0;
}
return -1;
}
@ -1630,6 +1632,9 @@ SStreamStateCur* streamStateSeekToLast_rocksdb(SStreamState* pState, const SWinK
const SStateKey maxStateKey = {.key = {.groupId = UINT64_MAX, .ts = INT64_MAX}, .opNum = INT64_MAX};
STREAM_STATE_PUT_ROCKSDB(pState, "state", &maxStateKey, "", 0);
if (code != 0) {
return NULL;
}
char buf[128] = {0};
int32_t klen = stateKeyEncode((void*)&maxStateKey, buf);
@ -1666,9 +1671,10 @@ SStreamStateCur* streamStateSeekToLast_rocksdb(SStreamState* pState, const SWinK
SStreamStateCur* streamStateGetCur_rocksdb(SStreamState* pState, const SWinKey* key) {
qDebug("streamStateGetCur_rocksdb");
SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper;
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
if (pCur == NULL) return NULL;
pCur->db = wrapper->rocksdb;
pCur->iter = streamStateIterCreate(pState, "state", (rocksdb_snapshot_t**)&pCur->snapshot,
(rocksdb_readoptions_t**)&pCur->readOpt);
@ -1717,8 +1723,6 @@ int32_t streamStateSessionPut_rocksdb(SStreamState* pState, const SSessionKey* k
int code = 0;
SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
STREAM_STATE_PUT_ROCKSDB(pState, "sess", &sKey, value, vLen);
if (code == -1) {
}
return code;
}
int32_t streamStateSessionGet_rocksdb(SStreamState* pState, SSessionKey* key, void** pVal, int32_t* pVLen) {
@ -1728,21 +1732,21 @@ int32_t streamStateSessionGet_rocksdb(SStreamState* pState, SSessionKey* key, vo
SSessionKey resKey = *key;
void* tmp = NULL;
int32_t vLen = 0;
code = streamStateSessionGetKVByCur_rocksdb(pCur, &resKey, &tmp, &vLen);
if (code == 0) {
if (pVLen != NULL) *pVLen = vLen;
if (key->win.skey != resKey.win.skey) {
code = -1;
} else {
*key = resKey;
*pVal = taosMemoryCalloc(1, *pVLen);
memcpy(*pVal, tmp, *pVLen);
}
code = streamStateSessionGetKVByCur_rocksdb(pCur, &resKey, &tmp, &vLen);
if (code == 0 && key->win.skey != resKey.win.skey) {
*key = resKey;
if (pVal) {
*pVal = tmp;
tmp = NULL;
};
if (pVLen) *pVLen = vLen;
} else {
code = -1;
}
taosMemoryFree(tmp);
streamStateFreeCur(pCur);
// impl later
return code;
}
@ -1788,8 +1792,6 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentPrev_rocksdb(SStreamState* pSta
rocksdb_iter_prev(pCur->iter);
if (!rocksdb_iter_valid(pCur->iter)) {
// qWarn("streamState failed to seek key prev
// %s", toString);
streamStateFreeCur(pCur);
return NULL;
}
@ -1807,10 +1809,10 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentNext_rocksdb(SStreamState* pSta
(rocksdb_readoptions_t**)&pCur->readOpt);
pCur->number = pState->number;
char buf[128] = {0};
char buf[128] = {0};
SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
int len = stateSessionKeyEncode(&sKey, buf);
if (!streamStateIterSeekAndValid(pCur->iter, buf, len)) {
streamStateFreeCur(pCur);
return NULL;
@ -1858,6 +1860,7 @@ SStreamStateCur* streamStateSessionSeekKeyNext_rocksdb(SStreamState* pState, con
streamStateFreeCur(pCur);
return NULL;
}
size_t klen;
const char* iKey = rocksdb_iter_key(pCur->iter, &klen);
SStateSessionKey curKey = {0};
@ -2393,8 +2396,8 @@ void streamStateDestroyBatch(void* pBatch) { rocksdb_writebatch_destroy((rock
int32_t streamStatePutBatch(SStreamState* pState, const char* cfKeyName, rocksdb_writebatch_t* pBatch, void* key,
void* val, int32_t vlen, int64_t ttl) {
SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper;
int i = streamStateGetCfIdx(pState, cfKeyName);
int i = streamStateGetCfIdx(pState, cfKeyName);
if (i < 0) {
qError("streamState failed to put to cf name:%s", cfKeyName);
return -1;

View File

@ -36,6 +36,14 @@ void streamMetaCleanup() {
taosCloseRef(streamBackendCfWrapperId);
}
int32_t streamStateRebuild(SStreamMeta* pMeta, char* path, int64_t chkpId) {
int32_t code = 0;
int32_t nTask = taosHashGetSize(pMeta->pTasks);
assert(nTask == 0);
return code;
}
SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId) {
int32_t code = -1;
SStreamMeta* pMeta = taosMemoryCalloc(1, sizeof(SStreamMeta));
@ -44,22 +52,13 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
return NULL;
}
int32_t len = strlen(path) + 20;
char* streamPath = taosMemoryCalloc(1, len);
sprintf(streamPath, "%s/%s", path, "stream");
pMeta->path = taosStrdup(streamPath);
char* tpath = taosMemoryCalloc(1, strlen(path) + 64);
sprintf(tpath, "%s%s%s", path, TD_DIRSEP, "stream");
pMeta->path = tpath;
if (tdbOpen(pMeta->path, 16 * 1024, 1, &pMeta->db, 0) < 0) {
goto _err;
}
memset(streamPath, 0, len);
sprintf(streamPath, "%s/%s", pMeta->path, "checkpoints");
code = taosMulModeMkDir(streamPath, 0755);
if (code != 0) {
terrno = TAOS_SYSTEM_ERROR(code);
goto _err;
}
if (tdbTbOpen("task.db", sizeof(int32_t), -1, NULL, pMeta->db, &pMeta->pTaskDb, 0) < 0) {
goto _err;
}
@ -90,14 +89,6 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
pMeta->ahandle = ahandle;
pMeta->expandFunc = expandFunc;
// memset(streamPath, 0, len);
// sprintf(streamPath, "%s/%s", pMeta->path, "state");
// code = taosMulModeMkDir(streamPath, 0755);
// if (code != 0) {
// terrno = TAOS_SYSTEM_ERROR(code);
// goto _err;
// }
pMeta->pTaskBackendUnique =
taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
pMeta->checkpointSaved = taosArrayInit(4, sizeof(int64_t));
@ -119,15 +110,12 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
goto _err;
}
taosMemoryFree(streamPath);
taosInitRWLatch(&pMeta->lock);
taosThreadMutexInit(&pMeta->backendMutex, NULL);
return pMeta;
_err:
taosMemoryFree(streamPath);
taosMemoryFree(pMeta->path);
if (pMeta->pTasks) taosHashCleanup(pMeta->pTasks);
if (pMeta->pTaskList) taosArrayDestroy(pMeta->pTaskList);
@ -140,6 +128,49 @@ _err:
return NULL;
}
void streamMetaReopen(SStreamMeta** ppMeta) {
SStreamMeta* pMeta = *ppMeta;
SStreamMeta* pNewMeta = taosMemoryCalloc(1, sizeof(SStreamMeta));
pNewMeta->path = taosStrdup(pMeta->path);
pNewMeta->vgId = pMeta->vgId;
pNewMeta->walScanCounter = 0;
pNewMeta->ahandle = pMeta->ahandle;
pNewMeta->expandFunc = pMeta->expandFunc;
*ppMeta = pNewMeta;
streamMetaClose(pMeta);
// tdbAbort(pMeta->db, pMeta->txn);
// tdbTbClose(pMeta->pTaskDb);
// tdbTbClose(pMeta->pCheckpointDb);
// tdbClose(pMeta->db);
// void* pIter = NULL;
// while (1) {
// pIter = taosHashIterate(pMeta->pTasks, pIter);
// if (pIter == NULL) {
// break;
// }
// SStreamTask* pTask = *(SStreamTask**)pIter;
// if (pTask->schedTimer) {
// taosTmrStop(pTask->schedTimer);
// pTask->schedTimer = NULL;
// }
// if (pTask->launchTaskTimer) {
// taosTmrStop(pTask->launchTaskTimer);
// pTask->launchTaskTimer = NULL;
// }
// tFreeStreamTask(pTask);
// }
// taosHashClear(pMeta->pTasks);
// taosRemoveRef(streamBackendId, pMeta->streamBackendRid);
}
void streamMetaClose(SStreamMeta* pMeta) {
tdbAbort(pMeta->db, pMeta->txn);
tdbTbClose(pMeta->pTaskDb);

View File

@ -89,6 +89,24 @@ void streamSnapHandleDestroy(SStreamSnapHandle* handle);
sprintf(fullname, "%s%s%s", path, TD_DIRSEP, file); \
} while (0)
int32_t streamGetFileSize(char* path, char* name, int64_t* sz) {
int ret = 0;
char* fullname = taosMemoryCalloc(1, strlen(path) + 32);
sprintf(fullname, "%s%s%s", path, TD_DIRSEP, name);
ret = taosStatFile(fullname, sz, NULL);
taosMemoryFree(fullname);
return ret;
}
TdFilePtr streamOpenFile(char* path, char* name, int32_t opt) {
char fullname[256] = {0};
STREAM_ROCKSDB_BUILD_FULLNAME(path, name, fullname);
return taosOpenFile(fullname, opt);
}
int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path, int64_t chkpId) {
// impl later
int len = strlen(path);
@ -129,7 +147,7 @@ int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path, int64_t chk
continue;
}
if (strlen(name) >= strlen(ROCKSDB_OPTIONS) && 0 == strncmp(name, ROCKSDB_OPTIONS, strlen(ROCKSDB_OPTIONS))) {
pFile->pMainfest = taosStrdup(name);
pFile->pOptions = taosStrdup(name);
continue;
}
if (strlen(name) >= strlen(ROCKSDB_CHECKPOINT_META) &&
@ -157,50 +175,39 @@ int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path, int64_t chk
// current
item.name = pFile->pCurrent;
item.type = ROCKSDB_CURRENT_TYPE;
if (taosStatFile(pFile->pCurrent, &item.size, NULL) != 0) {
qError("stream-state failed to get file size: %s", pFile->pCurrent);
}
streamGetFileSize(pFile->path, item.name, &item.size);
taosArrayPush(list, &item);
// mainfest
item.name = pFile->pMainfest;
item.type = ROCKSDB_MAINFEST_TYPE;
taosStatFile(pFile->pMainfest, &item.size, NULL);
streamGetFileSize(pFile->path, item.name, &item.size);
taosArrayPush(list, &item);
// options
item.name = pFile->pOptions;
item.type = ROCKSDB_OPTIONS_TYPE;
taosStatFile(pFile->pOptions, &item.size, NULL);
streamGetFileSize(pFile->path, item.name, &item.size);
taosArrayPush(list, &item);
// sst
for (int i = 0; i < taosArrayGetSize(pFile->pSst); i++) {
char* sst = taosArrayGetP(pFile->pSst, i);
item.name = sst;
item.type = ROCKSDB_SST_TYPE;
taosStatFile(sst, &item.size, NULL);
streamGetFileSize(pFile->path, item.name, &item.size);
taosArrayPush(list, &item);
}
// meta
item.name = pFile->pCheckpointMeta;
item.type = ROCKSDB_CHECKPOINT_META_TYPE;
taosStatFile(pFile->pCheckpointMeta, &item.size, NULL);
taosArrayPush(list, &item);
if (streamGetFileSize(pFile->path, item.name, &item.size) == 0) {
taosArrayPush(list, &item);
}
pHandle->pBackendFile = pFile;
pHandle->currFileIdx = 0;
pHandle->pFileList = list;
char fullname[256] = {0};
char* file = ((SBackendFileItem*)taosArrayGet(pHandle->pFileList, pHandle->currFileIdx))->name;
STREAM_ROCKSDB_BUILD_FULLNAME(pFile->path, file, fullname);
pHandle->fd = taosOpenFile(fullname, TD_FILE_READ);
if (pHandle->fd == NULL) {
qError("stream-state failed to open %s, reason: %s", tdir, tstrerror(errno));
tdir = NULL;
goto _err;
}
qDebug("stream-state open file %s, current offset %" PRId64 "", file, (int64_t)0);
pHandle->seraial = 0;
pHandle->offset = 0;
return 0;
@ -264,29 +271,35 @@ int32_t streamSnapRead(SStreamSnapReader* pReader, uint8_t** ppData, int64_t* si
SBackendFileItem* item = taosArrayGet(pHandle->pFileList, pHandle->currFileIdx);
qDebug("stream-state start to read file %s, current offset %" PRId64 ", size : % " PRId64 ", file no. %d", item->name,
(int64_t)pHandle->offset, item->size, pHandle->currFileIdx);
if (pHandle->fd == NULL) {
if (pHandle->currFileIdx >= taosArrayGetSize(pHandle->pFileList)) {
// finish
*ppData = NULL;
*size = 0;
return 0;
} else {
pHandle->fd = streamOpenFile(pFile->path, item->name, TD_FILE_READ);
qDebug("stream-state open file %s, current offset:%" PRId64 ", size:% " PRId64 ", file no.%d", item->name,
(int64_t)pHandle->offset, item->size, pHandle->currFileIdx);
}
}
qDebug("stream-state start to read file %s, current offset:%" PRId64 ", size:%" PRId64 ", file no.%d", item->name,
(int64_t)pHandle->offset, item->size, pHandle->currFileIdx);
uint8_t* buf = taosMemoryCalloc(1, sizeof(SStreamSnapBlockHdr) + kBlockSize);
int64_t nread = taosPReadFile(pHandle->fd, buf + sizeof(SStreamSnapBlockHdr), kBlockSize, pHandle->offset);
if (nread == -1) {
code = TAOS_SYSTEM_ERROR(terrno);
qError("stream-state snap failed to read snap, file name:%s, type:%d, reason:%s", item->name, item->type,
qError("stream-state snap failed to read snap, file name:%s, type:%d,reason:%s", item->name, item->type,
tstrerror(code));
qDebug("stream-state failed to read file %s, current offset %" PRId64 ", size : % " PRId64 ", file no. %d",
item->name, (int64_t)pHandle->offset, item->size, pHandle->currFileIdx);
return code;
// handle later
return -1;
} else if (nread > 0 && nread <= kBlockSize) {
// left bytes less than kBlockSize
qDebug("stream-state read file %s, current offset %" PRId64 ", size : % " PRId64 ", file no. %d", item->name,
qDebug("stream-state read file %s, current offset:%" PRId64 ",size:% " PRId64 ", file no.%d", item->name,
(int64_t)pHandle->offset, item->size, pHandle->currFileIdx);
pHandle->offset += nread;
if (pHandle->offset >= item->size || nread < kBlockSize) {
taosCloseFile(&pHandle->fd);
qDebug("stream-state close file no.%d, move to next file, next file no.%d", pHandle->currFileIdx,
pHandle->currFileIdx + 1);
pHandle->offset = 0;
pHandle->currFileIdx += 1;
}
@ -303,15 +316,13 @@ int32_t streamSnapRead(SStreamSnapReader* pReader, uint8_t** ppData, int64_t* si
return 0;
}
item = taosArrayGet(pHandle->pFileList, pHandle->currFileIdx);
char fullname[256] = {0};
STREAM_ROCKSDB_BUILD_FULLNAME(pFile->path, item->name, fullname);
pHandle->fd = taosOpenFile(fullname, TD_FILE_READ);
pHandle->fd = streamOpenFile(pFile->path, item->name, TD_FILE_READ);
nread = taosPReadFile(pHandle->fd, buf + sizeof(SStreamSnapBlockHdr), kBlockSize, pHandle->offset);
pHandle->offset += nread;
qDebug("read file %s, current offset %" PRId64 ", size : % " PRId64 ", file no. %d", item->name,
(int64_t)pHandle->offset, item->size, pHandle->currFileIdx);
qDebug("stream-state open file and read file %s, current offset:%" PRId64 ", size:% " PRId64 ", file no.%d",
item->name, (int64_t)pHandle->offset, item->size, pHandle->currFileIdx);
}
SStreamSnapBlockHdr* pHdr = (SStreamSnapBlockHdr*)buf;
@ -349,7 +360,7 @@ int32_t streamSnapWriterOpen(void* pMeta, int64_t sver, int64_t ever, char* path
pHandle->pFileList = list;
pHandle->currFileIdx = 0;
pHandle->offset = 0;
pHandle->fd = taosOpenFile(taosArrayGet(pHandle->pFileList, pHandle->currFileIdx), TD_FILE_WRITE);
*ppWriter = pWriter;
return 0;
}
@ -360,14 +371,25 @@ int32_t streamSnapWrite(SStreamSnapWriter* pWriter, uint8_t* pData, uint32_t nDa
SStreamSnapBlockHdr* pHdr = (SStreamSnapBlockHdr*)pData;
SStreamSnapHandle* pHandle = &pWriter->handle;
SBanckendFile* pFile = pHandle->pBackendFile;
SBackendFileItem* pItem = taosArrayGetP(pHandle->pFileList, pHandle->currFileIdx);
if (strlen(pHdr->name) == strlen(pItem->name) && strcmp(pHdr->name, pItem->name) == 0) {
if (taosPWriteFile(pHandle->fd, pHdr->data, pHdr->size, pHandle->offset) != pHdr->size) {
SBackendFileItem* pItem = taosArrayGet(pHandle->pFileList, pHandle->currFileIdx);
if (pHandle->fd == NULL) {
pHandle->fd = streamOpenFile(pFile->path, pItem->name, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND);
if (pHandle->fd == NULL) {
code = TAOS_SYSTEM_ERROR(terrno);
qError("stream snap failed to write snap, file name:%s, reason:%s", pHdr->name, tstrerror(code));
qError("stream-state failed to open file name:%s%s%s, reason:%s", pFile->path, TD_DIRSEP, pHdr->name,
tstrerror(code));
}
}
if (strlen(pHdr->name) == strlen(pItem->name) && strcmp(pHdr->name, pItem->name) == 0) {
int64_t bytes = taosPWriteFile(pHandle->fd, pHdr->data, pHdr->size, pHandle->offset);
if (bytes != pHdr->size) {
code = TAOS_SYSTEM_ERROR(terrno);
qError("stream-state failed to write snap, file name:%s, reason:%s", pHdr->name, tstrerror(code));
return code;
}
pHandle->offset += pHdr->size;
pHandle->offset += bytes;
} else {
taosCloseFile(&pHandle->fd);
pHandle->offset = 0;
@ -378,10 +400,13 @@ int32_t streamSnapWrite(SStreamSnapWriter* pWriter, uint8_t* pData, uint32_t nDa
item.type = pHdr->type;
taosArrayPush(pHandle->pFileList, &item);
char fullname[256] = {0};
char* name = ((SBackendFileItem*)taosArrayGet(pHandle->pFileList, taosArrayGetSize(pHandle->pFileList) - 1))->name;
STREAM_ROCKSDB_BUILD_FULLNAME(pFile->path, name, fullname);
pHandle->fd = taosOpenFile(fullname, TD_FILE_WRITE);
SBackendFileItem* pItem = taosArrayGet(pHandle->pFileList, pHandle->currFileIdx);
pHandle->fd = streamOpenFile(pFile->path, pItem->name, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND);
if (pHandle->fd == NULL) {
code = TAOS_SYSTEM_ERROR(terrno);
qError("stream-state failed to open file name:%s%s%s, reason:%s", pFile->path, TD_DIRSEP, pHdr->name,
tstrerror(code));
}
taosPWriteFile(pHandle->fd, pHdr->data, pHdr->size, pHandle->offset);
pHandle->offset += pHdr->size;
@ -406,6 +431,7 @@ int32_t streamSnapWriterClose(SStreamSnapWriter* pWriter, int8_t rollback) {
qDebug("stream snap get file list, %s", buf);
taosMemoryFree(buf);
}
for (int i = 0; i < taosArrayGetSize(handle->pFileList); i++) {
SBackendFileItem* item = taosArrayGet(handle->pFileList, i);
taosMemoryFree(item->name);