refactor: do some internal refactor.
This commit is contained in:
parent
105542bb21
commit
81f96603b0
|
@ -840,11 +840,10 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int3
|
|||
int32_t streamMetaGetNumOfTasks(SStreamMeta* pMeta);
|
||||
SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId);
|
||||
void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask);
|
||||
int32_t streamMetaReopen(SStreamMeta* pMeta);
|
||||
void streamMetaClear(SStreamMeta* pMeta);
|
||||
void streamMetaInitBackend(SStreamMeta* pMeta);
|
||||
int32_t streamMetaCommit(SStreamMeta* pMeta);
|
||||
int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta);
|
||||
int32_t streamMetaReloadAllTasks(SStreamMeta* pMeta);
|
||||
int64_t streamMetaGetLatestCheckpointId(SStreamMeta* pMeta);
|
||||
void streamMetaNotifyClose(SStreamMeta* pMeta);
|
||||
int32_t streamTaskSetDb(SStreamMeta* pMeta, void* pTask, char* key);
|
||||
|
|
|
@ -181,5 +181,5 @@ int32_t streamStateRebuildFromSnap(SStreamStateWriter* pWriter, int64_t chkpId)
|
|||
}
|
||||
|
||||
int32_t streamStateLoadTasks(SStreamStateWriter* pWriter) {
|
||||
return streamMetaReloadAllTasks(pWriter->pTq->pStreamMeta);
|
||||
return streamMetaLoadAllTasks(pWriter->pTq->pStreamMeta);
|
||||
}
|
||||
|
|
|
@ -712,9 +712,9 @@ int32_t resetStreamTaskStatus(SStreamMeta* pMeta) {
|
|||
}
|
||||
|
||||
static int32_t restartStreamTasks(SStreamMeta* pMeta, bool isLeader) {
|
||||
int32_t vgId = pMeta->vgId;
|
||||
int32_t code = 0;
|
||||
int64_t st = taosGetTimestampMs();
|
||||
int32_t vgId = pMeta->vgId;
|
||||
int32_t code = 0;
|
||||
int64_t st = taosGetTimestampMs();
|
||||
|
||||
while(1) {
|
||||
int32_t startVal = atomic_val_compare_exchange_32(&pMeta->startInfo.taskStarting, 0, 1);
|
||||
|
@ -736,17 +736,9 @@ static int32_t restartStreamTasks(SStreamMeta* pMeta, bool isLeader) {
|
|||
}
|
||||
|
||||
streamMetaWLock(pMeta);
|
||||
code = streamMetaReopen(pMeta);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
tqError("vgId:%d failed to reopen stream meta", vgId);
|
||||
streamMetaWUnLock(pMeta);
|
||||
code = terrno;
|
||||
return code;
|
||||
}
|
||||
streamMetaClear(pMeta);
|
||||
|
||||
streamMetaInitBackend(pMeta);
|
||||
int64_t el = taosGetTimestampMs() - st;
|
||||
|
||||
tqInfo("vgId:%d close&reload state elapsed time:%.3fs", vgId, el/1000.);
|
||||
|
||||
code = streamMetaLoadAllTasks(pMeta);
|
||||
|
|
|
@ -31,7 +31,6 @@ int32_t streamMetaId = 0;
|
|||
int32_t taskDbWrapperId = 0;
|
||||
|
||||
static void metaHbToMnode(void* param, void* tmrId);
|
||||
static void streamMetaClear(SStreamMeta* pMeta);
|
||||
static int32_t streamMetaBegin(SStreamMeta* pMeta);
|
||||
static void streamMetaCloseImpl(void* arg);
|
||||
|
||||
|
@ -395,41 +394,6 @@ _err:
|
|||
return NULL;
|
||||
}
|
||||
|
||||
int32_t streamMetaReopen(SStreamMeta* pMeta) {
|
||||
streamMetaClear(pMeta);
|
||||
|
||||
// NOTE: role should not be changed during reopen meta
|
||||
pMeta->streamBackendRid = -1;
|
||||
pMeta->streamBackend = NULL;
|
||||
|
||||
char* defaultPath = taosMemoryCalloc(1, strlen(pMeta->path) + 128);
|
||||
sprintf(defaultPath, "%s%s%s", pMeta->path, TD_DIRSEP, "state");
|
||||
taosRemoveDir(defaultPath);
|
||||
|
||||
char* newPath = taosMemoryCalloc(1, strlen(pMeta->path) + 128);
|
||||
sprintf(newPath, "%s%s%s", pMeta->path, TD_DIRSEP, "received");
|
||||
|
||||
int32_t code = taosStatFile(newPath, NULL, NULL, NULL);
|
||||
if (code == 0) {
|
||||
// directory exists
|
||||
code = taosRenameFile(newPath, defaultPath);
|
||||
if (code != 0) {
|
||||
terrno = TAOS_SYSTEM_ERROR(code);
|
||||
stError("vgId:%d failed to rename file, from %s to %s, code:%s", pMeta->vgId, newPath, defaultPath,
|
||||
tstrerror(terrno));
|
||||
|
||||
taosMemoryFree(defaultPath);
|
||||
taosMemoryFree(newPath);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
taosMemoryFree(defaultPath);
|
||||
taosMemoryFree(newPath);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
// todo refactor: the lock shoud be restricted in one function
|
||||
void streamMetaInitBackend(SStreamMeta* pMeta) {
|
||||
pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId, pMeta->vgId);
|
||||
|
@ -829,28 +793,27 @@ static void doClear(void* pKey, void* pVal, TBC* pCur, SArray* pRecycleList) {
|
|||
taosArrayDestroy(pRecycleList);
|
||||
}
|
||||
|
||||
int32_t streamMetaReloadAllTasks(SStreamMeta* pMeta) {
|
||||
if (pMeta == NULL) return 0;
|
||||
|
||||
return streamMetaLoadAllTasks(pMeta);
|
||||
}
|
||||
int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) {
|
||||
TBC* pCur = NULL;
|
||||
int32_t vgId = pMeta->vgId;
|
||||
|
||||
stInfo("vgId:%d load stream tasks from meta files", vgId);
|
||||
|
||||
if (tdbTbcOpen(pMeta->pTaskDb, &pCur, NULL) < 0) {
|
||||
stError("vgId:%d failed to open stream meta, code:%s", vgId, tstrerror(terrno));
|
||||
return -1;
|
||||
}
|
||||
|
||||
void* pKey = NULL;
|
||||
int32_t kLen = 0;
|
||||
void* pVal = NULL;
|
||||
int32_t vLen = 0;
|
||||
SDecoder decoder;
|
||||
SArray* pRecycleList = taosArrayInit(4, sizeof(STaskId));
|
||||
|
||||
if (pMeta == NULL) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
SArray* pRecycleList = taosArrayInit(4, sizeof(STaskId));
|
||||
int32_t vgId = pMeta->vgId;
|
||||
stInfo("vgId:%d load stream tasks from meta files", vgId);
|
||||
|
||||
if (tdbTbcOpen(pMeta->pTaskDb, &pCur, NULL) < 0) {
|
||||
stError("vgId:%d failed to open stream meta, code:%s", vgId, tstrerror(terrno));
|
||||
taosArrayDestroy(pRecycleList);
|
||||
return -1;
|
||||
}
|
||||
|
||||
tdbTbcMoveToFirst(pCur);
|
||||
while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) {
|
||||
|
|
Loading…
Reference in New Issue