add checkpoint
This commit is contained in:
parent
aad669e33c
commit
e18c9ea672
|
@ -103,7 +103,7 @@ typedef struct {
|
|||
} SStreamQueueItem;
|
||||
|
||||
typedef void FTbSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data);
|
||||
typedef int32_t FTaskExpand(void* ahandle, SStreamTask* pTask, int64_t ver, int64_t checkpointId);
|
||||
typedef int32_t FTaskExpand(void* ahandle, SStreamTask* pTask, int64_t ver);
|
||||
|
||||
typedef struct {
|
||||
int8_t type;
|
||||
|
|
|
@ -168,7 +168,7 @@ char* tqOffsetBuildFName(const char* path, int32_t fVer);
|
|||
int32_t tqOffsetRestoreFromFile(STqOffsetStore* pStore, const char* fname);
|
||||
|
||||
// tqStream
|
||||
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver, int64_t checkpointId);
|
||||
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver);
|
||||
int32_t tqStreamTasksScanWal(STQ* pTq);
|
||||
int32_t tqStreamTasksStatusCheck(STQ* pTq);
|
||||
|
||||
|
|
|
@ -743,7 +743,7 @@ end:
|
|||
|
||||
void freePtr(void* ptr) { taosMemoryFree(*(void**)ptr); }
|
||||
|
||||
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver, int64_t checkpointId) {
|
||||
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
|
||||
int32_t vgId = TD_VID(pTq->pVnode);
|
||||
|
||||
pTask->id.idStr = createStreamTaskIdStr(pTask->id.streamId, pTask->id.taskId);
|
||||
|
|
|
@ -144,12 +144,46 @@ SCfInit ginitDict[] = {
|
|||
encodeValueFunc, decodeValueFunc},
|
||||
};
|
||||
|
||||
int32_t streamBackendRebuildDirFromCheckpoint(const char* stateDir, const char* chkpDir) {
|
||||
// impl later
|
||||
bool isValidCheckpoint(const char* dir) { return true; }
|
||||
|
||||
return 0;
|
||||
int32_t copyFiles(const char* src, const char* dst) {
|
||||
int32_t code = 0;
|
||||
// opt later, just hard link
|
||||
int32_t sLen = strlen(src);
|
||||
int32_t dLen = strlen(dst);
|
||||
char* absSrcPath = taosMemoryCalloc(1, sLen + 64);
|
||||
char* absDstPath = taosMemoryCalloc(1, dLen + 64);
|
||||
|
||||
TdDirPtr pDir = taosOpenDir(src);
|
||||
if (pDir == NULL) return 0;
|
||||
|
||||
TdDirEntryPtr de = NULL;
|
||||
|
||||
while ((de = taosReadDir(pDir)) != NULL) {
|
||||
char* name = taosGetDirEntryName(de);
|
||||
if (strcmp(name, ".") == 0 || strcmp(name, "..") == 0) continue;
|
||||
|
||||
sprintf(absSrcPath, "%s/%s", src, name);
|
||||
sprintf(absDstPath, "%s/%s", dst, name);
|
||||
if (!taosDirEntryIsDir(de)) {
|
||||
code = taosCopyFile(absSrcPath, absDstPath);
|
||||
if (code == -1) {
|
||||
goto _err;
|
||||
}
|
||||
}
|
||||
|
||||
memset(absSrcPath, 0, sLen + 64);
|
||||
memset(absDstPath, 0, dLen + 64);
|
||||
}
|
||||
|
||||
_err:
|
||||
taosMemoryFreeClear(absSrcPath);
|
||||
taosMemoryFreeClear(absDstPath);
|
||||
taosCloseDir(&pDir);
|
||||
return code;
|
||||
}
|
||||
void* streamBackendInit(const char* path, int64_t chkpId) {
|
||||
int32_t rebuildDirFromCheckpoint(const char* path, int64_t chkpId, char** dst) {
|
||||
// impl later
|
||||
int32_t code = 0;
|
||||
char* state = taosMemoryCalloc(1, strlen(path) + 32);
|
||||
|
||||
|
@ -157,14 +191,14 @@ void* streamBackendInit(const char* path, int64_t chkpId) {
|
|||
if (chkpId != 0) {
|
||||
char* chkp = taosMemoryCalloc(1, strlen(path) + 64);
|
||||
sprintf(chkp, "%s/%s/checkpoint-%" PRId64 "", path, "checkpoints", chkpId);
|
||||
if (taosIsDir(chkp)) {
|
||||
if (taosIsDir(chkp) && isValidCheckpoint(chkp)) {
|
||||
if (taosIsDir(state)) {
|
||||
// remove dir if exists
|
||||
// taosRenameFile(const char *oldName, const char *newName)
|
||||
taosRemoveDir(state);
|
||||
}
|
||||
taosMkDir(state);
|
||||
code = streamBackendRebuildDirFromCheckpoint(state, chkp);
|
||||
code = copyFiles(chkp, state);
|
||||
if (code != 0) {
|
||||
qError("failed to restart stream backend from %s, reason: %s", chkp, tstrerror(TAOS_SYSTEM_ERROR(errno)));
|
||||
}
|
||||
|
@ -174,9 +208,16 @@ void* streamBackendInit(const char* path, int64_t chkpId) {
|
|||
taosMkDir(state);
|
||||
}
|
||||
}
|
||||
*dst = state;
|
||||
|
||||
return 0;
|
||||
}
|
||||
void* streamBackendInit(const char* path, int64_t chkpId) {
|
||||
uint32_t dbMemLimit = nextPow2(tsMaxStreamBackendCache) << 20;
|
||||
|
||||
char* state = NULL;
|
||||
int32_t code = rebuildDirFromCheckpoint(path, chkpId, &state);
|
||||
|
||||
qDebug("start to init stream backend at %s", state);
|
||||
SBackendWrapper* pHandle = taosMemoryCalloc(1, sizeof(SBackendWrapper));
|
||||
pHandle->list = tdListNew(sizeof(SCfComparator));
|
||||
|
@ -232,6 +273,7 @@ void* streamBackendInit(const char* path, int64_t chkpId) {
|
|||
rocksdb_list_column_families_destroy(cfs, nCf);
|
||||
}
|
||||
qDebug("succ to init stream backend at %s, backend:%p", state, pHandle);
|
||||
taosMemoryFreeClear(state);
|
||||
|
||||
return (void*)pHandle;
|
||||
_EXIT:
|
||||
|
@ -243,6 +285,7 @@ _EXIT:
|
|||
taosHashCleanup(pHandle->cfInst);
|
||||
rocksdb_compactionfilterfactory_destroy(pHandle->filterFactory);
|
||||
tdListFree(pHandle->list);
|
||||
taosMemoryFree(state);
|
||||
taosMemoryFree(pHandle);
|
||||
qDebug("failed to init stream backend at %s", path);
|
||||
return NULL;
|
||||
|
|
|
@ -249,7 +249,7 @@ int32_t streamMetaAddDeployedTask(SStreamMeta* pMeta, int64_t ver, SStreamTask*
|
|||
|
||||
void* p = taosHashGet(pMeta->pTasks, &pTask->id.taskId, sizeof(pTask->id.taskId));
|
||||
if (p == NULL) {
|
||||
if (pMeta->expandFunc(pMeta->ahandle, pTask, ver, checkpointId) < 0) {
|
||||
if (pMeta->expandFunc(pMeta->ahandle, pTask, ver) < 0) {
|
||||
tFreeStreamTask(pTask);
|
||||
return -1;
|
||||
}
|
||||
|
@ -447,8 +447,6 @@ _err:
|
|||
return chkpId;
|
||||
}
|
||||
int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) {
|
||||
int64_t checkpointId = 0;
|
||||
|
||||
TBC* pCur = NULL;
|
||||
if (tdbTbcOpen(pMeta->pTaskDb, &pCur, NULL) < 0) {
|
||||
return -1;
|
||||
|
@ -477,7 +475,7 @@ int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) {
|
|||
// remove duplicate
|
||||
void* p = taosHashGet(pMeta->pTasks, &pTask->id.taskId, sizeof(pTask->id.taskId));
|
||||
if (p == NULL) {
|
||||
if (pMeta->expandFunc(pMeta->ahandle, pTask, pTask->chkInfo.checkpointVer, checkpointId) < 0) {
|
||||
if (pMeta->expandFunc(pMeta->ahandle, pTask, pTask->chkInfo.checkpointVer) < 0) {
|
||||
tdbFree(pKey);
|
||||
tdbFree(pVal);
|
||||
tdbTbcClose(pCur);
|
||||
|
|
Loading…
Reference in New Issue