refact task backend
This commit is contained in:
parent
2fa91341a7
commit
f49a2cbeb0
|
@ -168,10 +168,8 @@ int32_t streamStateSnapWriterClose(SStreamStateWriter* pWriter, int8_t rollback)
|
||||||
}
|
}
|
||||||
int32_t streamStateRebuildFromSnap(SStreamStateWriter* pWriter, int64_t chkpId) {
|
int32_t streamStateRebuildFromSnap(SStreamStateWriter* pWriter, int64_t chkpId) {
|
||||||
tqDebug("vgId:%d, vnode %s start to rebuild stream-state", TD_VID(pWriter->pTq->pVnode), STREAM_STATE_TRANSFER);
|
tqDebug("vgId:%d, vnode %s start to rebuild stream-state", TD_VID(pWriter->pTq->pVnode), STREAM_STATE_TRANSFER);
|
||||||
int32_t code = streamMetaReopen(pWriter->pTq->pStreamMeta, chkpId);
|
// int32_t code = streamMetaReopen(pWriter->pTq->pStreamMeta, chkpId);
|
||||||
if (code == 0) {
|
int32_t code = streamStateLoadTasks(pWriter);
|
||||||
code = streamStateLoadTasks(pWriter);
|
|
||||||
}
|
|
||||||
tqDebug("vgId:%d, vnode %s succ to rebuild stream-state", TD_VID(pWriter->pTq->pVnode), STREAM_STATE_TRANSFER);
|
tqDebug("vgId:%d, vnode %s succ to rebuild stream-state", TD_VID(pWriter->pTq->pVnode), STREAM_STATE_TRANSFER);
|
||||||
taosMemoryFree(pWriter);
|
taosMemoryFree(pWriter);
|
||||||
return code;
|
return code;
|
||||||
|
|
|
@ -256,6 +256,7 @@ int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData)
|
||||||
if (code) {
|
if (code) {
|
||||||
pReader->streamStateDone = 1;
|
pReader->streamStateDone = 1;
|
||||||
pReader->pStreamStateReader = NULL;
|
pReader->pStreamStateReader = NULL;
|
||||||
|
code = 0;
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -922,7 +922,8 @@ int32_t chkpPreFlushDb(rocksdb_t* db, rocksdb_column_family_handle_t** cf, int32
|
||||||
rocksdb_flushoptions_destroy(flushOpt);
|
rocksdb_flushoptions_destroy(flushOpt);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
int32_t chkpPreCheckDir(char* path, int64_t chkpId, char** chkpDir, char** chkpIdDir) {
|
|
||||||
|
int32_t chkpPreBuildDir(char* path, int64_t chkpId, char** chkpDir, char** chkpIdDir) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
char* pChkpDir = taosMemoryCalloc(1, 256);
|
char* pChkpDir = taosMemoryCalloc(1, 256);
|
||||||
char* pChkpIdDir = taosMemoryCalloc(1, 256);
|
char* pChkpIdDir = taosMemoryCalloc(1, 256);
|
||||||
|
@ -1026,7 +1027,7 @@ int32_t taskBackendDoCheckpoint(void* arg, uint64_t chkpId) {
|
||||||
|
|
||||||
char* pChkpDir = NULL;
|
char* pChkpDir = NULL;
|
||||||
char* pChkpIdDir = NULL;
|
char* pChkpIdDir = NULL;
|
||||||
if (chkpPreCheckDir(pBackend->path, chkpId, &pChkpDir, &pChkpIdDir) != 0) {
|
if (chkpPreBuildDir(pBackend->path, chkpId, &pChkpDir, &pChkpIdDir) != 0) {
|
||||||
goto _EXIT;
|
goto _EXIT;
|
||||||
}
|
}
|
||||||
// Get all cf and acquire cfWrappter
|
// Get all cf and acquire cfWrappter
|
||||||
|
|
|
@ -806,7 +806,7 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
// do duplicate task check.
|
// do duplicate task checke
|
||||||
int64_t keys[2] = {pTask->id.streamId, pTask->id.taskId};
|
int64_t keys[2] = {pTask->id.streamId, pTask->id.taskId};
|
||||||
void* p = taosHashGet(pMeta->pTasks, keys, sizeof(keys));
|
void* p = taosHashGet(pMeta->pTasks, keys, sizeof(keys));
|
||||||
if (p == NULL) {
|
if (p == NULL) {
|
||||||
|
|
|
@ -270,7 +270,7 @@ void streamSnapHandleDestroy(SStreamSnapHandle* handle) {
|
||||||
|
|
||||||
if (handle->checkpointId == 0) {
|
if (handle->checkpointId == 0) {
|
||||||
// del tmp dir
|
// del tmp dir
|
||||||
if (taosIsDir(pFile->path)) {
|
if (pFile && taosIsDir(pFile->path)) {
|
||||||
taosRemoveDir(pFile->path);
|
taosRemoveDir(pFile->path);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
|
Loading…
Reference in New Issue