Merge pull request #24056 from taosdata/fix/3_liaohj

refactor: do some internal refactor.
This commit is contained in:
Haojun Liao 2023-12-14 13:52:04 +08:00 committed by GitHub
commit 05ca8f1784
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 52 additions and 106 deletions

View File

@ -509,10 +509,7 @@ typedef struct SStreamMeta {
SArray* chkpSaved;
SArray* chkpInUse;
SRWLatch chkpDirLock;
void* qHandle;
int32_t pauseTaskNum;
void* bkdChkptMgt;
} SStreamMeta;
@ -840,11 +837,10 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int3
int32_t streamMetaGetNumOfTasks(SStreamMeta* pMeta);
SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId);
void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask);
int32_t streamMetaReopen(SStreamMeta* pMeta);
void streamMetaClear(SStreamMeta* pMeta);
void streamMetaInitBackend(SStreamMeta* pMeta);
int32_t streamMetaCommit(SStreamMeta* pMeta);
int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta);
int32_t streamMetaReloadAllTasks(SStreamMeta* pMeta);
int64_t streamMetaGetLatestCheckpointId(SStreamMeta* pMeta);
void streamMetaNotifyClose(SStreamMeta* pMeta);
int32_t streamTaskSetDb(SStreamMeta* pMeta, void* pTask, char* key);

View File

@ -699,7 +699,23 @@ end:
return ret;
}
void freePtr(void* ptr) { taosMemoryFree(*(void**)ptr); }
static void freePtr(void* ptr) { taosMemoryFree(*(void**)ptr); }
static STaskId replaceStreamTaskId(SStreamTask* pTask) {
ASSERT(pTask->info.fillHistory);
STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
pTask->id.streamId = pTask->streamTaskId.streamId;
pTask->id.taskId = pTask->streamTaskId.taskId;
return id;
}
static void restoreStreamTaskId(SStreamTask* pTask, STaskId* pId) {
ASSERT(pTask->info.fillHistory);
pTask->id.taskId = pId->taskId;
pTask->id.streamId = pId->streamId;
}
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t nextProcessVer) {
int32_t vgId = TD_VID(pTq->pVnode);
@ -713,15 +729,9 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t nextProcessVer) {
streamTaskOpenAllUpstreamInput(pTask);
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
SStreamTask* pStateTask = pTask;
STaskId taskId = {.streamId = 0, .taskId = 0};
STaskId taskId = {0};
if (pTask->info.fillHistory) {
taskId.streamId = pTask->id.streamId;
taskId.taskId = pTask->id.taskId;
pTask->id.streamId = pTask->streamTaskId.streamId;
pTask->id.taskId = pTask->streamTaskId.taskId;
taskId = replaceStreamTaskId(pTask);
}
pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask, false, -1, -1);
@ -731,9 +741,9 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t nextProcessVer) {
} else {
tqDebug("s-task:%s state:%p", pTask->id.idStr, pTask->pState);
}
if (pTask->info.fillHistory) {
pTask->id.streamId = taskId.streamId;
pTask->id.taskId = taskId.taskId;
restoreStreamTaskId(pTask, &taskId);
}
SReadHandle handle = {
@ -754,15 +764,9 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t nextProcessVer) {
qSetTaskId(pTask->exec.pExecutor, pTask->id.taskId, pTask->id.streamId);
} else if (pTask->info.taskLevel == TASK_LEVEL__AGG) {
SStreamTask* pSateTask = pTask;
// SStreamTask task = {0};
STaskId taskId = {.streamId = 0, .taskId = 0};
STaskId taskId = {0};
if (pTask->info.fillHistory) {
taskId.streamId = pTask->id.streamId;
taskId.taskId = pTask->id.taskId;
pTask->id.streamId = pTask->streamTaskId.streamId;
pTask->id.taskId = pTask->streamTaskId.taskId;
taskId = replaceStreamTaskId(pTask);
}
pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask, false, -1, -1);
@ -774,15 +778,13 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t nextProcessVer) {
}
if (pTask->info.fillHistory) {
pTask->id.streamId = taskId.streamId;
pTask->id.taskId = taskId.taskId;
restoreStreamTaskId(pTask, &taskId);
}
int32_t numOfVgroups = (int32_t)taosArrayGetSize(pTask->upstreamInfo.pList);
SReadHandle handle = {
.checkpointId = pTask->chkInfo.checkpointId,
.vnode = NULL,
.numOfVgroups = numOfVgroups,
.numOfVgroups = (int32_t)taosArrayGetSize(pTask->upstreamInfo.pList),
.pStateBackend = pTask->pState,
.fillHistory = pTask->info.fillHistory,
.winRange = pTask->dataRange.window,
@ -828,12 +830,6 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t nextProcessVer) {
pTask->exec.pWalReader = walOpenReader(pTq->pVnode->pWal, &cond, pTask->id.taskId);
}
// // reset the task status from unfinished transaction
// if (pTask->status.taskStatus == TASK_STATUS__PAUSE) {
// tqWarn("s-task:%s reset task status to be normal, status kept in taskMeta: Paused", pTask->id.idStr);
// pTask->status.taskStatus = TASK_STATUS__READY;
// }
streamTaskResetUpstreamStageInfo(pTask);
streamSetupScheduleTrigger(pTask);
SCheckpointInfo* pChkInfo = &pTask->chkInfo;

View File

@ -181,5 +181,5 @@ int32_t streamStateRebuildFromSnap(SStreamStateWriter* pWriter, int64_t chkpId)
}
int32_t streamStateLoadTasks(SStreamStateWriter* pWriter) {
return streamMetaReloadAllTasks(pWriter->pTq->pStreamMeta);
return streamMetaLoadAllTasks(pWriter->pTq->pStreamMeta);
}

View File

@ -736,17 +736,9 @@ static int32_t restartStreamTasks(SStreamMeta* pMeta, bool isLeader) {
}
streamMetaWLock(pMeta);
code = streamMetaReopen(pMeta);
if (code != TSDB_CODE_SUCCESS) {
tqError("vgId:%d failed to reopen stream meta", vgId);
streamMetaWUnLock(pMeta);
code = terrno;
return code;
}
streamMetaClear(pMeta);
streamMetaInitBackend(pMeta);
int64_t el = taosGetTimestampMs() - st;
tqInfo("vgId:%d close&reload state elapsed time:%.3fs", vgId, el/1000.);
code = streamMetaLoadAllTasks(pMeta);
@ -758,10 +750,10 @@ static int32_t restartStreamTasks(SStreamMeta* pMeta, bool isLeader) {
}
if (isLeader && !tsDisableStream) {
tqInfo("vgId:%d restart all stream tasks after all tasks being updated", vgId);
resetStreamTaskStatus(pMeta);
streamMetaWUnLock(pMeta);
tqInfo("vgId:%d restart all stream tasks after all tasks being updated", vgId);
startStreamTasks(pMeta);
} else {
streamMetaResetStartInfo(&pMeta->startInfo);

View File

@ -5466,11 +5466,10 @@ bool blockDistSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo) {
}
int32_t blockDistFunction(SqlFunctionCtx* pCtx) {
const int32_t BLOCK_DIST_RESULT_ROWS = 24;
const int32_t BLOCK_DIST_RESULT_ROWS = 25;
SInputColumnInfoData* pInput = &pCtx->input;
SColumnInfoData* pInputCol = pInput->pData[0];
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
STableBlockDistInfo* pDistInfo = GET_ROWCELL_INTERBUF(pResInfo);

View File

@ -31,7 +31,6 @@ int32_t streamMetaId = 0;
int32_t taskDbWrapperId = 0;
static void metaHbToMnode(void* param, void* tmrId);
static void streamMetaClear(SStreamMeta* pMeta);
static int32_t streamMetaBegin(SStreamMeta* pMeta);
static void streamMetaCloseImpl(void* arg);
@ -395,41 +394,6 @@ _err:
return NULL;
}
int32_t streamMetaReopen(SStreamMeta* pMeta) {
streamMetaClear(pMeta);
// NOTE: role should not be changed during reopen meta
pMeta->streamBackendRid = -1;
pMeta->streamBackend = NULL;
char* defaultPath = taosMemoryCalloc(1, strlen(pMeta->path) + 128);
sprintf(defaultPath, "%s%s%s", pMeta->path, TD_DIRSEP, "state");
taosRemoveDir(defaultPath);
char* newPath = taosMemoryCalloc(1, strlen(pMeta->path) + 128);
sprintf(newPath, "%s%s%s", pMeta->path, TD_DIRSEP, "received");
int32_t code = taosStatFile(newPath, NULL, NULL, NULL);
if (code == 0) {
// directory exists
code = taosRenameFile(newPath, defaultPath);
if (code != 0) {
terrno = TAOS_SYSTEM_ERROR(code);
stError("vgId:%d failed to rename file, from %s to %s, code:%s", pMeta->vgId, newPath, defaultPath,
tstrerror(terrno));
taosMemoryFree(defaultPath);
taosMemoryFree(newPath);
return -1;
}
}
taosMemoryFree(defaultPath);
taosMemoryFree(newPath);
return 0;
}
// todo refactor: the lock shoud be restricted in one function
void streamMetaInitBackend(SStreamMeta* pMeta) {
pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId, pMeta->vgId);
@ -829,28 +793,27 @@ static void doClear(void* pKey, void* pVal, TBC* pCur, SArray* pRecycleList) {
taosArrayDestroy(pRecycleList);
}
int32_t streamMetaReloadAllTasks(SStreamMeta* pMeta) {
if (pMeta == NULL) return 0;
return streamMetaLoadAllTasks(pMeta);
}
int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) {
TBC* pCur = NULL;
int32_t vgId = pMeta->vgId;
stInfo("vgId:%d load stream tasks from meta files", vgId);
if (tdbTbcOpen(pMeta->pTaskDb, &pCur, NULL) < 0) {
stError("vgId:%d failed to open stream meta, code:%s", vgId, tstrerror(terrno));
return -1;
}
void* pKey = NULL;
int32_t kLen = 0;
void* pVal = NULL;
int32_t vLen = 0;
SDecoder decoder;
if (pMeta == NULL) {
return TSDB_CODE_SUCCESS;
}
SArray* pRecycleList = taosArrayInit(4, sizeof(STaskId));
int32_t vgId = pMeta->vgId;
stInfo("vgId:%d load stream tasks from meta files", vgId);
if (tdbTbcOpen(pMeta->pTaskDb, &pCur, NULL) < 0) {
stError("vgId:%d failed to open stream meta, code:%s", vgId, tstrerror(terrno));
taosArrayDestroy(pRecycleList);
return -1;
}
tdbTbcMoveToFirst(pCur);
while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) {