From 391103bc5a3d34fc9681c57baa540ffb73829f04 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 28 Aug 2023 15:54:01 +0800 Subject: [PATCH] fix(stream): do dome internal refactor. --- include/libs/stream/tstream.h | 47 ++++++++++------------- source/dnode/vnode/src/tq/tq.c | 4 +- source/dnode/vnode/src/tq/tqRestore.c | 4 +- source/libs/stream/src/streamCheckpoint.c | 2 +- source/libs/stream/src/streamExec.c | 45 ---------------------- source/libs/stream/src/streamMeta.c | 47 +++++++++++++++-------- source/libs/stream/src/streamTask.c | 39 ------------------- 7 files changed, 55 insertions(+), 133 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 169dcaf628..6456d2baa0 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -258,11 +258,16 @@ typedef struct SStreamChildEpInfo { int64_t stage; // upstream task stage value, to denote if the upstream node has restart/replica changed/transfer } SStreamChildEpInfo; -typedef struct SStreamId { +typedef struct SStreamTaskKey { + int64_t streamId; + int64_t taskId; +} SStreamTaskKey; + +typedef struct SStreamTaskId { int64_t streamId; int32_t taskId; const char* idStr; -} SStreamId; +} SStreamTaskId; typedef struct SCheckpointInfo { int64_t checkpointId; @@ -275,7 +280,6 @@ typedef struct SStreamStatus { int8_t downstreamReady; // downstream tasks are all ready now, if this flag is set int8_t schedStatus; int8_t keepTaskStatus; - bool transferState; bool appendTranstateBlock; // has append the transfer state data block already, todo: remove it int8_t timerActive; // timer is active int8_t pauseAllowed; // allowed task status to be set to be paused @@ -317,7 +321,7 @@ typedef struct { struct SStreamTask { int64_t ver; - SStreamId id; + SStreamTaskId id; SSTaskBasicInfo info; STaskOutputInfo outputInfo; SDispatchMsgInfo msgInfo; @@ -325,8 +329,8 @@ struct SStreamTask { SCheckpointInfo chkInfo; STaskExec exec; SHistDataRange dataRange; - SStreamId historyTaskId; - SStreamId streamTaskId; + SStreamTaskId historyTaskId; + SStreamTaskId streamTaskId; int32_t nextCheckId; SArray* checkpointInfo; // SArray STaskTimestamp tsInfo; @@ -630,12 +634,8 @@ int32_t streamSetupScheduleTrigger(SStreamTask* pTask); int32_t streamProcessRunReq(SStreamTask* pTask); int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pMsg, bool exec); int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, int32_t code); -void streamTaskCloseUpstreamInput(SStreamTask* pTask, int32_t taskId); -void streamTaskOpenAllUpstreamInput(SStreamTask* pTask); int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pMsg); -void streamTaskOpenAllUpstreamInput(SStreamTask* pTask); -void streamTaskCloseUpstreamInput(SStreamTask* pTask, int32_t taskId); SStreamChildEpInfo* streamTaskGetUpstreamTaskEpInfo(SStreamTask* pTask, int32_t taskId); void streamTaskInputFail(SStreamTask* pTask); @@ -655,7 +655,6 @@ char* createStreamTaskIdStr(int64_t streamId, int32_t taskId); void streamTaskCheckDownstreamTasks(SStreamTask* pTask); int32_t streamTaskLaunchScanHistory(SStreamTask* pTask); int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_t vgId, int64_t stage); -int32_t streamTaskRestart(SStreamTask* pTask, const char* pDir, bool startTask); int32_t streamTaskUpdateEpsetInfo(SStreamTask* pTask, SArray* pNodeList); void streamTaskResetUpstreamStageInfo(SStreamTask* pTask); @@ -683,6 +682,10 @@ int32_t streamTaskSetUpstreamInfo(SStreamTask* pTask, const SStreamTask* pUp void streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet); void streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet); void streamTaskSetFixedDownstreamInfo(SStreamTask* pTask, const SStreamTask* pDownstreamTask); +int32_t streamTaskReleaseState(SStreamTask* pTask); +int32_t streamTaskReloadState(SStreamTask* pTask); +void streamTaskCloseUpstreamInput(SStreamTask* pTask, int32_t taskId); +void streamTaskOpenAllUpstreamInput(SStreamTask* pTask); // source level int32_t streamSetParamForStreamScannerStep1(SStreamTask* pTask, SVersionRange* pVerRange, STimeWindow* pWindow); @@ -703,36 +706,28 @@ void streamMetaInit(); void streamMetaCleanup(); SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId, int64_t stage); void streamMetaClose(SStreamMeta* streamMeta); - -// save to stream meta store -int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask); -int32_t streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId); +int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask); // save to stream meta store +int32_t streamMetaRemoveTask(SStreamMeta* pMeta, int64_t* pKey); int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask, bool* pAdded); int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId); -int32_t streamMetaGetNumOfTasks(SStreamMeta* pMeta); // todo remove it +int32_t streamMetaGetNumOfTasks(SStreamMeta* pMeta); SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId); void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask); - -// int32_t streamStateRebuild(SStreamMeta* pMeta, char* path, int64_t chkpId); -int32_t streamMetaReopen(SStreamMeta* pMeta, int64_t chkpId); - -int32_t streamMetaBegin(SStreamMeta* pMeta); -int32_t streamMetaCommit(SStreamMeta* pMeta); -int32_t streamLoadTasks(SStreamMeta* pMeta); +int32_t streamMetaReopen(SStreamMeta* pMeta, int64_t chkpId); +int32_t streamMetaCommit(SStreamMeta* pMeta); +int32_t streamLoadTasks(SStreamMeta* pMeta); +void streamMetaNotifyClose(SStreamMeta* pMeta); // checkpoint int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSourceReq* pReq); int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask); -int32_t streamTaskReleaseState(SStreamTask* pTask); -int32_t streamTaskReloadState(SStreamTask* pTask); int32_t streamAlignTransferState(SStreamTask* pTask); int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SStreamTask* pTask, int8_t isSucceed); int32_t buildCheckpointSourceRsp(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SRpcMsg* pMsg, int8_t isSucceed); -void streamMetaNotifyClose(SStreamMeta* pMeta); #ifdef __cplusplus } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 3c3e6993d2..afae8131eb 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1278,8 +1278,6 @@ int32_t tqProcessTaskTransferStateReq(STQ* pTq, SRpcMsg* pMsg) { tqDebug("s-task:%s all upstream tasks send transfer msg, open transfer state flag", pTask->id.idStr); ASSERT(pTask->streamTaskId.taskId != 0 && pTask->info.fillHistory == 1); - pTask->status.transferState = true; - streamSchedExec(pTask); streamMetaReleaseTask(pTq->pStreamMeta, pTask); return 0; @@ -1803,7 +1801,7 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) { // bool allStopped = true; // int32_t numOfCount = streamMetaGetNumOfTasks(pMeta); // for(int32_t i = 0; i < numOfCount; ++i) { -// SStreamId* pId = taosArrayGet(pMeta->pTaskList, i); +// SStreamTaskId* pId = taosArrayGet(pMeta->pTaskList, i); // // int64_t keys1[2] = {pId->streamId, pId->taskId}; // SStreamTask** p = taosHashGet(pMeta->pTasks, keys1, sizeof(keys1)); diff --git a/source/dnode/vnode/src/tq/tqRestore.c b/source/dnode/vnode/src/tq/tqRestore.c index 7f543d1850..9cf5983228 100644 --- a/source/dnode/vnode/src/tq/tqRestore.c +++ b/source/dnode/vnode/src/tq/tqRestore.c @@ -74,7 +74,7 @@ int32_t tqStreamTasksStatusCheck(STQ* pTq) { // broadcast the check downstream tasks msg for (int32_t i = 0; i < numOfTasks; ++i) { - SStreamId* pTaskId = taosArrayGet(pTaskList, i); + SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i); SStreamTask* pTask = streamMetaAcquireTask(pMeta, pTaskId->streamId, pTaskId->taskId); if (pTask == NULL) { continue; @@ -254,7 +254,7 @@ int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) { numOfTasks = taosArrayGetSize(pTaskList); for (int32_t i = 0; i < numOfTasks; ++i) { - SStreamId* pTaskId = taosArrayGet(pTaskList, i); + SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i); SStreamTask* pTask = streamMetaAcquireTask(pStreamMeta, pTaskId->streamId, pTaskId->taskId); if (pTask == NULL) { continue; diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index b40d0a1804..b9f167c39b 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -267,7 +267,7 @@ int32_t streamSaveAllTaskStatus(SStreamMeta* pMeta, int64_t checkpointId) { int64_t keys[2]; for (int32_t i = 0; i < taosArrayGetSize(pMeta->pTaskList); ++i) { - SStreamId* pId = taosArrayGet(pMeta->pTaskList, i); + SStreamTaskId* pId = taosArrayGet(pMeta->pTaskList, i); keys[0] = pId->streamId; keys[1] = pId->taskId; diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 1f2e7ea07d..e61f26fb89 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -499,51 +499,6 @@ int32_t streamProcessTranstateBlock(SStreamTask* pTask, SStreamDataBlock* pBlock return code; } -///** -// * todo: the batch of blocks should be tuned dynamic, according to the total elapsed time of each batch of blocks, the -// * appropriate batch of blocks should be handled in 5 to 10 sec. -// */ -// int32_t streamExecForAll(SStreamTask* pTask) { -// const char* id = pTask->id.idStr; -// -// while (1) { -// int32_t batchSize = 0; -// SStreamQueueItem* pInput = NULL; -// -// // merge multiple input data if possible in the input queue. -// qDebug("s-task:%s start to extract data block from inputQ", id); -// -// /*int32_t code = */extractMsgFromInputQ(pTask, &pInput, &batchSize, id); -// if (pInput == NULL) { -// ASSERT(batchSize == 0); -// if (pTask->info.fillHistory && pTask->status.transferState) { -// int32_t code = streamTransferStateToStreamTask(pTask); -// if (code != TSDB_CODE_SUCCESS) { // todo handle this -// return 0; -// } -// } -// -// break; -// } -// -// if (pTask->info.taskLevel == TASK_LEVEL__SINK) { -// ASSERT(pInput->type == STREAM_INPUT__DATA_BLOCK); -// qDebug("s-task:%s sink task start to sink %d blocks", id, batchSize); -// streamTaskOutputResultBlock(pTask, (SStreamDataBlock*)pInput); -// continue; -// } -// -// int64_t st = taosGetTimestampMs(); -// qDebug("s-task:%s start to process batch of blocks, num:%d", id, batchSize); -// -//// { -// // set input -// const SStreamQueueItem* pItem = pInput; -// qDebug("s-task:%s start to process batch of blocks, num:%d, type:%d", id, numOfBlocks, pItem->type); -// -// int64_t ver = pTask->chkInfo.checkpointVer; -// doSetStreamInputBlock(pTask, pInput, &ver, id); - /** * todo: the batch of blocks should be tuned dynamic, according to the total elapsed time of each batch of blocks, the * appropriate batch of blocks should be handled in 5 to 10 sec. diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 83b9570db1..ff97eac8e2 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -23,6 +23,7 @@ #define META_HB_CHECK_INTERVAL 200 #define META_HB_SEND_IDLE_COUNTER 25 // send hb every 5 sec +#define STREAM_TASK_KEY_LEN ((sizeof(int64_t)) << 1) static TdThreadOnce streamMetaModuleInit = PTHREAD_ONCE_INIT; @@ -39,8 +40,9 @@ SGStreamMetaMgt gStreamMetaMgt; static int64_t streamGetLatestCheckpointId(SStreamMeta* pMeta); static void metaHbToMnode(void* param, void* tmrId); static void streamMetaClear(SStreamMeta* pMeta); - -void streamMetaCloseImpl(void* arg); +static int32_t streamMetaBegin(SStreamMeta* pMeta); +static void streamMetaCloseImpl(void* arg); +static void extractStreamTaskKey(int64_t* pKey, const SStreamTask* pTask); static void streamMetaEnvInit() { streamBackendId = taosOpenRef(64, streamBackendCleanup); @@ -105,7 +107,7 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF goto _err; } - if (tdbTbOpen("task.db", sizeof(int32_t), -1, NULL, pMeta->db, &pMeta->pTaskDb, 0) < 0) { + if (tdbTbOpen("task.db", STREAM_TASK_KEY_LEN, -1, NULL, pMeta->db, &pMeta->pTaskDb, 0) < 0) { goto _err; } @@ -124,7 +126,7 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF } // task list - pMeta->pTaskList = taosArrayInit(4, sizeof(SStreamId)); + pMeta->pTaskList = taosArrayInit(4, sizeof(SStreamTaskId)); if (pMeta->pTaskList == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; goto _err; @@ -311,7 +313,10 @@ int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask) { tEncodeStreamTask(&encoder, pTask); tEncoderClear(&encoder); - if (tdbTbUpsert(pMeta->pTaskDb, &pTask->id.taskId, sizeof(int32_t), buf, len, pMeta->txn) < 0) { + int64_t key[2] = {0}; + extractStreamTaskKey(key, pTask); + + if (tdbTbUpsert(pMeta->pTaskDb, key, STREAM_TASK_KEY_LEN, buf, len, pMeta->txn) < 0) { qError("s-task:%s save to disk failed, code:%s", pTask->id.idStr, tstrerror(terrno)); return -1; } @@ -320,12 +325,17 @@ int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask) { return 0; } -int32_t streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId) { - int32_t code = tdbTbDelete(pMeta->pTaskDb, &taskId, sizeof(taskId), pMeta->txn); +void extractStreamTaskKey(int64_t* pKey, const SStreamTask* pTask) { + pKey[0] = pTask->id.streamId; + pKey[1] = pTask->id.taskId; +} + +int32_t streamMetaRemoveTask(SStreamMeta* pMeta, int64_t* pKey) { + int32_t code = tdbTbDelete(pMeta->pTaskDb, pKey, STREAM_TASK_KEY_LEN, pMeta->txn); if (code != 0) { - qError("vgId:%d failed to remove task:0x%x from metastore, code:%s", pMeta->vgId, taskId, tstrerror(terrno)); + qError("vgId:%d failed to remove task:0x%x from metastore, code:%s", pMeta->vgId, (int32_t)pKey[1], tstrerror(terrno)); } else { - qDebug("vgId:%d remove task:0x%x from metastore", pMeta->vgId, taskId); + qDebug("vgId:%d remove task:0x%x from metastore", pMeta->vgId, (int32_t)pKey[1]); } return code; @@ -400,9 +410,9 @@ void streamMetaReleaseTask(SStreamMeta* UNUSED_PARAM(pMeta), SStreamTask* pTask) } } -static void doRemoveIdFromList(SStreamMeta* pMeta, int32_t num, SStreamId* id) { +static void doRemoveIdFromList(SStreamMeta* pMeta, int32_t num, SStreamTaskId* id) { for (int32_t i = 0; i < num; ++i) { - SStreamId* pTaskId = taosArrayGet(pMeta->pTaskList, i); + SStreamTaskId* pTaskId = taosArrayGet(pMeta->pTaskList, i); if (pTaskId->streamId == id->streamId && pTaskId->taskId == id->taskId) { taosArrayRemove(pMeta->pTaskList, i); break; @@ -460,7 +470,7 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t ASSERT(pTask->status.timerActive == 0); doRemoveIdFromList(pMeta, (int32_t)taosArrayGetSize(pMeta->pTaskList), &pTask->id); - streamMetaRemoveTask(pMeta, taskId); + streamMetaRemoveTask(pMeta, keys); streamMetaReleaseTask(pMeta, pTask); } else { qDebug("vgId:%d failed to find the task:0x%x, it may have been dropped already", pMeta->vgId, taskId); @@ -558,7 +568,7 @@ int32_t streamLoadTasks(SStreamMeta* pMeta) { void* pVal = NULL; int32_t vLen = 0; SDecoder decoder; - SArray* pRecycleList = taosArrayInit(4, sizeof(int32_t)); + SArray* pRecycleList = taosArrayInit(4, STREAM_TASK_KEY_LEN); tdbTbcMoveToFirst(pCur); while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) { @@ -585,7 +595,10 @@ int32_t streamLoadTasks(SStreamMeta* pMeta) { int32_t taskId = pTask->id.taskId; tFreeStreamTask(pTask); - taosArrayPush(pRecycleList, &taskId); + int64_t key[2] = {0}; + extractStreamTaskKey(key, pTask); + + taosArrayPush(pRecycleList, key); int32_t total = taosArrayGetSize(pRecycleList); qDebug("s-task:0x%x is already dropped, add into recycle list, total:%d", taskId, total); continue; @@ -628,8 +641,8 @@ int32_t streamLoadTasks(SStreamMeta* pMeta) { if (taosArrayGetSize(pRecycleList) > 0) { for (int32_t i = 0; i < taosArrayGetSize(pRecycleList); ++i) { - int32_t taskId = *(int32_t*)taosArrayGet(pRecycleList, i); - streamMetaRemoveTask(pMeta, taskId); + int64_t* pId = taosArrayGet(pRecycleList, i); + streamMetaRemoveTask(pMeta, pId); } } @@ -715,7 +728,7 @@ void metaHbToMnode(void* param, void* tmrId) { hbMsg.pTaskStatus = taosArrayInit(numOfTasks, sizeof(STaskStatusEntry)); for (int32_t i = 0; i < numOfTasks; ++i) { - SStreamId* pId = taosArrayGet(pMeta->pTaskList, i); + SStreamTaskId* pId = taosArrayGet(pMeta->pTaskList, i); int64_t keys[2] = {pId->streamId, pId->taskId}; SStreamTask** pTask = taosHashGet(pMeta->pTasks, keys, sizeof(keys)); diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index c506fb2306..01dcb435c0 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -493,45 +493,6 @@ int32_t streamTaskStop(SStreamTask* pTask) { return 0; } -int32_t streamTaskRestart(SStreamTask* pTask, const char* pDir, bool startTask) { - const char* id = pTask->id.idStr; - int64_t stage = pTask->pMeta->stage; - int32_t vgId = pTask->pMeta->vgId; - - qDebug("s-task:%s vgId:%d restart current task, stage:%" PRId64 ", status:%s, sched-status:%d", id, vgId, stage, - streamGetTaskStatusStr(pTask->status.taskStatus), pTask->status.schedStatus); - - // 1. stop task - streamTaskStop(pTask); - - // 2. clear state info - streamQueueCleanup(pTask->inputQueue); - streamQueueCleanup(pTask->outputInfo.queue); - taosArrayClear(pTask->checkReqIds); - taosArrayClear(pTask->pRspMsgList); - - // reset the upstream task stage info - streamTaskResetUpstreamStageInfo(pTask); - - pTask->status.downstreamReady = 0; - - // todo: handle the case when the task is in fill-history (step 1) phase - streamSetStatusNormal(pTask); - - taosWLockLatch(&pTask->pMeta->lock); - streamMetaSaveTask(pTask->pMeta, pTask); - streamMetaCommit(pTask->pMeta); - taosWUnLockLatch(&pTask->pMeta->lock); - - qDebug("s-task:%s vgId:%d restart completed", pTask->id.idStr, vgId); - - // 3. start to check the downstream status - if (startTask) { - streamTaskCheckDownstreamTasks(pTask); - } - return 0; -} - int32_t doUpdateTaskEpset(SStreamTask* pTask, int32_t nodeId, SEpSet* pEpSet) { char buf[512] = {0};