support reopen stream state

This commit is contained in:
yihaoDeng 2023-08-03 08:24:50 +00:00
parent 4aef18733f
commit f9d0874e45
5 changed files with 102 additions and 60 deletions

View File

@ -369,7 +369,7 @@ typedef struct SStreamMeta {
int32_t chkptNotReadyTasks; int32_t chkptNotReadyTasks;
int64_t checkpointId; int64_t checkpointId;
SArray* checkpointSaved; SArray* checkpointSaved;
SArray* checkpointInUse; SArray* checkpointInUse;
int32_t checkpointCap; int32_t checkpointCap;
@ -601,6 +601,7 @@ int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, int32_t childId);
void streamMetaInit(); void streamMetaInit();
void streamMetaCleanup(); void streamMetaCleanup();
SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId); SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId);
void streamMetaClose(SStreamMeta* streamMeta); void streamMetaClose(SStreamMeta* streamMeta);
int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask); int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask);
int32_t streamMetaAddDeployedTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask); int32_t streamMetaAddDeployedTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask);
@ -610,6 +611,8 @@ SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int32_t taskId);
void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask); void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask);
void streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId); void streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId);
int32_t streamStateRebuild(SStreamMeta* pMeta, char* path, int64_t chkpId);
int32_t streamMetaBegin(SStreamMeta* pMeta); int32_t streamMetaBegin(SStreamMeta* pMeta);
int32_t streamMetaCommit(SStreamMeta* pMeta); int32_t streamMetaCommit(SStreamMeta* pMeta);
int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver); int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver);

View File

@ -93,7 +93,7 @@ typedef struct SQueryNode SQueryNode;
#define VNODE_BUFPOOL_SEGMENTS 3 #define VNODE_BUFPOOL_SEGMENTS 3
#define VND_INFO_FNAME "vnode.json" #define VND_INFO_FNAME "vnode.json"
#define VND_INFO_FNAME_TMP "vnode_tmp.json" #define VND_INFO_FNAME_TMP "vnode_tmp.json"
// vnd.h // vnd.h
@ -181,8 +181,8 @@ SArray* metaGetSmaTbUids(SMeta* pMeta);
void* metaGetIdx(SMeta* pMeta); void* metaGetIdx(SMeta* pMeta);
void* metaGetIvtIdx(SMeta* pMeta); void* metaGetIvtIdx(SMeta* pMeta);
int64_t metaGetTbNum(SMeta *pMeta); int64_t metaGetTbNum(SMeta* pMeta);
void metaReaderDoInit(SMetaReader *pReader, SMeta *pMeta, int32_t flags); void metaReaderDoInit(SMetaReader* pReader, SMeta* pMeta, int32_t flags);
int32_t metaCreateTSma(SMeta* pMeta, int64_t version, SSmaCfg* pCfg); int32_t metaCreateTSma(SMeta* pMeta, int64_t version, SSmaCfg* pCfg);
int32_t metaDropTSma(SMeta* pMeta, int64_t indexUid); int32_t metaDropTSma(SMeta* pMeta, int64_t indexUid);
@ -329,6 +329,8 @@ int32_t streamStateSnapRead(SStreamStateReader* pReader, uint8_t** ppData);
int32_t streamStateSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamStateWriter** ppWriter); int32_t streamStateSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamStateWriter** ppWriter);
int32_t streamStateSnapWriterClose(SStreamStateWriter* pWriter, int8_t rollback); int32_t streamStateSnapWriterClose(SStreamStateWriter* pWriter, int8_t rollback);
int32_t streamStateSnapWrite(SStreamStateWriter* pWriter, uint8_t* pData, uint32_t nData); int32_t streamStateSnapWrite(SStreamStateWriter* pWriter, uint8_t* pData, uint32_t nData);
int32_t streamStateRebuildFromSnap(SStreamStateWriter* pWriter, char* path, int64_t chkpId);
// SStreamTaskReader ====================================== // SStreamTaskReader ======================================
// SStreamStateWriter ===================================== // SStreamStateWriter =====================================
// SStreamStateReader ===================================== // SStreamStateReader =====================================

View File

@ -162,6 +162,9 @@ int32_t streamStateSnapWriterClose(SStreamStateWriter* pWriter, int8_t rollback)
taosMemoryFree(pWriter); taosMemoryFree(pWriter);
return code; return code;
} }
int32_t streamStateRebuildFromSnap(SStreamStateWriter* pWriter, char* path, int64_t chkpId) {
return streamStateRebuild(pWriter->pTq->pStreamMeta, path, chkpId);
}
int32_t streamStateSnapWrite(SStreamStateWriter* pWriter, uint8_t* pData, uint32_t nData) { int32_t streamStateSnapWrite(SStreamStateWriter* pWriter, uint8_t* pData, uint32_t nData) {
int32_t code = 0; int32_t code = 0;

View File

@ -158,7 +158,6 @@ int32_t copyFiles(const char* src, const char* dst) {
if (pDir == NULL) return 0; if (pDir == NULL) return 0;
TdDirEntryPtr de = NULL; TdDirEntryPtr de = NULL;
while ((de = taosReadDir(pDir)) != NULL) { while ((de = taosReadDir(pDir)) != NULL) {
char* name = taosGetDirEntryName(de); char* name = taosGetDirEntryName(de);
if (strcmp(name, ".") == 0 || strcmp(name, "..") == 0) continue; if (strcmp(name, ".") == 0 || strcmp(name, "..") == 0) continue;
@ -186,7 +185,11 @@ int32_t rebuildDirFromCheckpoint(const char* path, int64_t chkpId, char** dst) {
// impl later // impl later
int32_t code = 0; int32_t code = 0;
// chkpId = 0; /*param@1: checkpointId dir
param@2: state
copy checkpointdir's file to state dir
opt to set hard link to previous file
*/
char* state = taosMemoryCalloc(1, strlen(path) + 32); char* state = taosMemoryCalloc(1, strlen(path) + 32);
sprintf(state, "%s%s%s", path, TD_DIRSEP, "state"); sprintf(state, "%s%s%s", path, TD_DIRSEP, "state");
if (chkpId != 0) { if (chkpId != 0) {
@ -1469,10 +1472,11 @@ int32_t streamStateClear_rocksdb(SStreamState* pState) {
qDebug("streamStateClear_rocksdb"); qDebug("streamStateClear_rocksdb");
SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper; SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper;
char sKeyStr[128] = {0};
char eKeyStr[128] = {0}; char sKeyStr[128] = {0};
SStateKey sKey = {.key = {.ts = 0, .groupId = 0}, .opNum = pState->number}; char eKeyStr[128] = {0};
SStateKey eKey = {.key = {.ts = INT64_MAX, .groupId = UINT64_MAX}, .opNum = pState->number}; SStateKey sKey = {.key = {.ts = 0, .groupId = 0}, .opNum = pState->number};
SStateKey eKey = {.key = {.ts = INT64_MAX, .groupId = UINT64_MAX}, .opNum = pState->number};
int sLen = stateKeyEncode(&sKey, sKeyStr); int sLen = stateKeyEncode(&sKey, sKeyStr);
int eLen = stateKeyEncode(&eKey, eKeyStr); int eLen = stateKeyEncode(&eKey, eKeyStr);
@ -1507,9 +1511,9 @@ int32_t streamStateGetFirst_rocksdb(SStreamState* pState, SWinKey* key) {
qDebug("streamStateGetFirst_rocksdb"); qDebug("streamStateGetFirst_rocksdb");
SWinKey tmp = {.ts = 0, .groupId = 0}; SWinKey tmp = {.ts = 0, .groupId = 0};
streamStatePut_rocksdb(pState, &tmp, NULL, 0); streamStatePut_rocksdb(pState, &tmp, NULL, 0);
SStreamStateCur* pCur = streamStateSeekKeyNext_rocksdb(pState, &tmp);
int32_t code = streamStateGetKVByCur_rocksdb(pCur, key, NULL, 0); SStreamStateCur* pCur = streamStateSeekKeyNext_rocksdb(pState, &tmp);
int32_t code = streamStateGetKVByCur_rocksdb(pCur, key, NULL, 0);
streamStateFreeCur(pCur); streamStateFreeCur(pCur);
streamStateDel_rocksdb(pState, &tmp); streamStateDel_rocksdb(pState, &tmp);
return code; return code;
@ -1523,10 +1527,8 @@ int32_t streamStateGetGroupKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey,
uint64_t groupId = pKey->groupId; uint64_t groupId = pKey->groupId;
int32_t code = streamStateFillGetKVByCur_rocksdb(pCur, pKey, pVal, pVLen); int32_t code = streamStateFillGetKVByCur_rocksdb(pCur, pKey, pVal, pVLen);
if (code == 0) { if (code == 0 && pKey->groupId == groupId) {
if (pKey->groupId == groupId) { return 0;
return 0;
}
} }
return -1; return -1;
} }
@ -1628,6 +1630,9 @@ SStreamStateCur* streamStateSeekToLast_rocksdb(SStreamState* pState, const SWinK
const SStateKey maxStateKey = {.key = {.groupId = UINT64_MAX, .ts = INT64_MAX}, .opNum = INT64_MAX}; const SStateKey maxStateKey = {.key = {.groupId = UINT64_MAX, .ts = INT64_MAX}, .opNum = INT64_MAX};
STREAM_STATE_PUT_ROCKSDB(pState, "state", &maxStateKey, "", 0); STREAM_STATE_PUT_ROCKSDB(pState, "state", &maxStateKey, "", 0);
if (code != 0) {
return NULL;
}
char buf[128] = {0}; char buf[128] = {0};
int32_t klen = stateKeyEncode((void*)&maxStateKey, buf); int32_t klen = stateKeyEncode((void*)&maxStateKey, buf);
@ -1664,9 +1669,10 @@ SStreamStateCur* streamStateSeekToLast_rocksdb(SStreamState* pState, const SWinK
SStreamStateCur* streamStateGetCur_rocksdb(SStreamState* pState, const SWinKey* key) { SStreamStateCur* streamStateGetCur_rocksdb(SStreamState* pState, const SWinKey* key) {
qDebug("streamStateGetCur_rocksdb"); qDebug("streamStateGetCur_rocksdb");
SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper; SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper;
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
if (pCur == NULL) return NULL; if (pCur == NULL) return NULL;
pCur->db = wrapper->rocksdb; pCur->db = wrapper->rocksdb;
pCur->iter = streamStateIterCreate(pState, "state", (rocksdb_snapshot_t**)&pCur->snapshot, pCur->iter = streamStateIterCreate(pState, "state", (rocksdb_snapshot_t**)&pCur->snapshot,
(rocksdb_readoptions_t**)&pCur->readOpt); (rocksdb_readoptions_t**)&pCur->readOpt);
@ -1715,8 +1721,6 @@ int32_t streamStateSessionPut_rocksdb(SStreamState* pState, const SSessionKey* k
int code = 0; int code = 0;
SStateSessionKey sKey = {.key = *key, .opNum = pState->number}; SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
STREAM_STATE_PUT_ROCKSDB(pState, "sess", &sKey, value, vLen); STREAM_STATE_PUT_ROCKSDB(pState, "sess", &sKey, value, vLen);
if (code == -1) {
}
return code; return code;
} }
int32_t streamStateSessionGet_rocksdb(SStreamState* pState, SSessionKey* key, void** pVal, int32_t* pVLen) { int32_t streamStateSessionGet_rocksdb(SStreamState* pState, SSessionKey* key, void** pVal, int32_t* pVLen) {
@ -1726,21 +1730,21 @@ int32_t streamStateSessionGet_rocksdb(SStreamState* pState, SSessionKey* key, vo
SSessionKey resKey = *key; SSessionKey resKey = *key;
void* tmp = NULL; void* tmp = NULL;
int32_t vLen = 0; int32_t vLen = 0;
code = streamStateSessionGetKVByCur_rocksdb(pCur, &resKey, &tmp, &vLen);
if (code == 0) {
if (pVLen != NULL) *pVLen = vLen;
if (key->win.skey != resKey.win.skey) { code = streamStateSessionGetKVByCur_rocksdb(pCur, &resKey, &tmp, &vLen);
code = -1; if (code == 0 && key->win.skey != resKey.win.skey) {
} else { *key = resKey;
*key = resKey;
*pVal = taosMemoryCalloc(1, *pVLen); if (pVal) {
memcpy(*pVal, tmp, *pVLen); *pVal = tmp;
} tmp = NULL;
};
if (pVLen) *pVLen = vLen;
} else {
code = -1;
} }
taosMemoryFree(tmp); taosMemoryFree(tmp);
streamStateFreeCur(pCur);
// impl later
return code; return code;
} }
@ -1786,8 +1790,6 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentPrev_rocksdb(SStreamState* pSta
rocksdb_iter_prev(pCur->iter); rocksdb_iter_prev(pCur->iter);
if (!rocksdb_iter_valid(pCur->iter)) { if (!rocksdb_iter_valid(pCur->iter)) {
// qWarn("streamState failed to seek key prev
// %s", toString);
streamStateFreeCur(pCur); streamStateFreeCur(pCur);
return NULL; return NULL;
} }
@ -1805,10 +1807,10 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentNext_rocksdb(SStreamState* pSta
(rocksdb_readoptions_t**)&pCur->readOpt); (rocksdb_readoptions_t**)&pCur->readOpt);
pCur->number = pState->number; pCur->number = pState->number;
char buf[128] = {0}; char buf[128] = {0};
SStateSessionKey sKey = {.key = *key, .opNum = pState->number}; SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
int len = stateSessionKeyEncode(&sKey, buf); int len = stateSessionKeyEncode(&sKey, buf);
if (!streamStateIterSeekAndValid(pCur->iter, buf, len)) { if (!streamStateIterSeekAndValid(pCur->iter, buf, len)) {
streamStateFreeCur(pCur); streamStateFreeCur(pCur);
return NULL; return NULL;
@ -1856,6 +1858,7 @@ SStreamStateCur* streamStateSessionSeekKeyNext_rocksdb(SStreamState* pState, con
streamStateFreeCur(pCur); streamStateFreeCur(pCur);
return NULL; return NULL;
} }
size_t klen; size_t klen;
const char* iKey = rocksdb_iter_key(pCur->iter, &klen); const char* iKey = rocksdb_iter_key(pCur->iter, &klen);
SStateSessionKey curKey = {0}; SStateSessionKey curKey = {0};
@ -2391,8 +2394,8 @@ void streamStateDestroyBatch(void* pBatch) { rocksdb_writebatch_destroy((rock
int32_t streamStatePutBatch(SStreamState* pState, const char* cfKeyName, rocksdb_writebatch_t* pBatch, void* key, int32_t streamStatePutBatch(SStreamState* pState, const char* cfKeyName, rocksdb_writebatch_t* pBatch, void* key,
void* val, int32_t vlen, int64_t ttl) { void* val, int32_t vlen, int64_t ttl) {
SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper; SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper;
int i = streamStateGetCfIdx(pState, cfKeyName);
int i = streamStateGetCfIdx(pState, cfKeyName);
if (i < 0) { if (i < 0) {
qError("streamState failed to put to cf name:%s", cfKeyName); qError("streamState failed to put to cf name:%s", cfKeyName);
return -1; return -1;

View File

@ -36,6 +36,14 @@ void streamMetaCleanup() {
taosCloseRef(streamBackendCfWrapperId); taosCloseRef(streamBackendCfWrapperId);
} }
int32_t streamStateRebuild(SStreamMeta* pMeta, char* path, int64_t chkpId) {
int32_t code = 0;
int32_t nTask = taosHashGetSize(pMeta->pTasks);
assert(nTask == 0);
return code;
}
SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId) { SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId) {
int32_t code = -1; int32_t code = -1;
SStreamMeta* pMeta = taosMemoryCalloc(1, sizeof(SStreamMeta)); SStreamMeta* pMeta = taosMemoryCalloc(1, sizeof(SStreamMeta));
@ -44,22 +52,13 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
return NULL; return NULL;
} }
int32_t len = strlen(path) + 20; char* tpath = taosMemoryCalloc(1, strlen(path) + 64);
char* streamPath = taosMemoryCalloc(1, len); sprintf(tpath, "%s%s%s", path, TD_DIRSEP, "stream");
sprintf(streamPath, "%s/%s", path, "stream"); pMeta->path = tpath;
pMeta->path = taosStrdup(streamPath);
if (tdbOpen(pMeta->path, 16 * 1024, 1, &pMeta->db, 0) < 0) { if (tdbOpen(pMeta->path, 16 * 1024, 1, &pMeta->db, 0) < 0) {
goto _err; goto _err;
} }
memset(streamPath, 0, len);
sprintf(streamPath, "%s/%s", pMeta->path, "checkpoints");
code = taosMulModeMkDir(streamPath, 0755);
if (code != 0) {
terrno = TAOS_SYSTEM_ERROR(code);
goto _err;
}
if (tdbTbOpen("task.db", sizeof(int32_t), -1, NULL, pMeta->db, &pMeta->pTaskDb, 0) < 0) { if (tdbTbOpen("task.db", sizeof(int32_t), -1, NULL, pMeta->db, &pMeta->pTaskDb, 0) < 0) {
goto _err; goto _err;
} }
@ -90,14 +89,6 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
pMeta->ahandle = ahandle; pMeta->ahandle = ahandle;
pMeta->expandFunc = expandFunc; pMeta->expandFunc = expandFunc;
// memset(streamPath, 0, len);
// sprintf(streamPath, "%s/%s", pMeta->path, "state");
// code = taosMulModeMkDir(streamPath, 0755);
// if (code != 0) {
// terrno = TAOS_SYSTEM_ERROR(code);
// goto _err;
// }
pMeta->pTaskBackendUnique = pMeta->pTaskBackendUnique =
taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
pMeta->checkpointSaved = taosArrayInit(4, sizeof(int64_t)); pMeta->checkpointSaved = taosArrayInit(4, sizeof(int64_t));
@ -119,15 +110,12 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
goto _err; goto _err;
} }
taosMemoryFree(streamPath);
taosInitRWLatch(&pMeta->lock); taosInitRWLatch(&pMeta->lock);
taosThreadMutexInit(&pMeta->backendMutex, NULL); taosThreadMutexInit(&pMeta->backendMutex, NULL);
return pMeta; return pMeta;
_err: _err:
taosMemoryFree(streamPath);
taosMemoryFree(pMeta->path); taosMemoryFree(pMeta->path);
if (pMeta->pTasks) taosHashCleanup(pMeta->pTasks); if (pMeta->pTasks) taosHashCleanup(pMeta->pTasks);
if (pMeta->pTaskList) taosArrayDestroy(pMeta->pTaskList); if (pMeta->pTaskList) taosArrayDestroy(pMeta->pTaskList);
@ -140,6 +128,49 @@ _err:
return NULL; return NULL;
} }
void streamMetaReopen(SStreamMeta** ppMeta) {
SStreamMeta* pMeta = *ppMeta;
SStreamMeta* pNewMeta = taosMemoryCalloc(1, sizeof(SStreamMeta));
pNewMeta->path = taosStrdup(pMeta->path);
pNewMeta->vgId = pMeta->vgId;
pNewMeta->walScanCounter = 0;
pNewMeta->ahandle = pMeta->ahandle;
pNewMeta->expandFunc = pMeta->expandFunc;
*ppMeta = pNewMeta;
streamMetaClose(pMeta);
// tdbAbort(pMeta->db, pMeta->txn);
// tdbTbClose(pMeta->pTaskDb);
// tdbTbClose(pMeta->pCheckpointDb);
// tdbClose(pMeta->db);
// void* pIter = NULL;
// while (1) {
// pIter = taosHashIterate(pMeta->pTasks, pIter);
// if (pIter == NULL) {
// break;
// }
// SStreamTask* pTask = *(SStreamTask**)pIter;
// if (pTask->schedTimer) {
// taosTmrStop(pTask->schedTimer);
// pTask->schedTimer = NULL;
// }
// if (pTask->launchTaskTimer) {
// taosTmrStop(pTask->launchTaskTimer);
// pTask->launchTaskTimer = NULL;
// }
// tFreeStreamTask(pTask);
// }
// taosHashClear(pMeta->pTasks);
// taosRemoveRef(streamBackendId, pMeta->streamBackendRid);
}
void streamMetaClose(SStreamMeta* pMeta) { void streamMetaClose(SStreamMeta* pMeta) {
tdbAbort(pMeta->db, pMeta->txn); tdbAbort(pMeta->db, pMeta->txn);
tdbTbClose(pMeta->pTaskDb); tdbTbClose(pMeta->pTaskDb);