From 7b0891981ecfcbc1a16843a6e92a9c0dbabe9474 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 8 Nov 2023 14:14:01 +0800 Subject: [PATCH] merge 3.0 --- include/libs/stream/tstream.h | 105 ++++++++++++++-------------- source/libs/stream/src/streamMeta.c | 19 +++-- 2 files changed, 62 insertions(+), 62 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index e4bca14a9e..b41fc75782 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -39,16 +39,16 @@ extern "C" { #define TASK_DOWNSTREAM_NOT_LEADER 0x2 #define TASK_SELF_NEW_STAGE 0x3 -#define TASK_DOWNSTREAM_READY 0x0 -#define TASK_DOWNSTREAM_NOT_READY 0x1 -#define TASK_DOWNSTREAM_NOT_LEADER 0x2 -#define TASK_UPSTREAM_NEW_STAGE 0x3 +#define TASK_DOWNSTREAM_READY 0x0 +#define TASK_DOWNSTREAM_NOT_READY 0x1 +#define TASK_DOWNSTREAM_NOT_LEADER 0x2 +#define TASK_UPSTREAM_NEW_STAGE 0x3 -#define NODE_ROLE_UNINIT 0x1 -#define NODE_ROLE_LEADER 0x2 -#define NODE_ROLE_FOLLOWER 0x3 +#define NODE_ROLE_UNINIT 0x1 +#define NODE_ROLE_LEADER 0x2 +#define NODE_ROLE_FOLLOWER 0x3 -#define HAS_RELATED_FILLHISTORY_TASK(_t) ((_t)->hTaskInfo.id.taskId != 0) +#define HAS_RELATED_FILLHISTORY_TASK(_t) ((_t)->hTaskInfo.id.taskId != 0) #define CLEAR_RELATED_FILLHISTORY_TASK(_t) \ do { \ (_t)->hTaskInfo.id.taskId = 0; \ @@ -378,7 +378,7 @@ typedef struct SHistoryTaskInfo { int32_t tickCount; int32_t retryTimes; int32_t waitInterval; - int64_t haltVer; // offset in wal when halt the stream task + int64_t haltVer; // offset in wal when halt the stream task } SHistoryTaskInfo; typedef struct STaskOutputInfo { @@ -442,7 +442,7 @@ typedef struct STaskStartInfo { int64_t startTs; int64_t readyTs; int32_t startAllTasksFlag; - SHashObj* pReadyTaskSet; // tasks that are all ready for running stream processing + SHashObj* pReadyTaskSet; // tasks that are all ready for running stream processing int32_t elapsedTime; } STaskStartInfo; @@ -453,32 +453,32 @@ typedef struct STaskUpdateInfo { // meta typedef struct SStreamMeta { - char* path; - TDB* db; - TTB* pTaskDb; - TTB* pCheckpointDb; - SHashObj* pTasksMap; - SArray* pTaskList; // SArray - void* ahandle; - TXN* txn; - FTaskExpand* expandFunc; - int32_t vgId; - int64_t stage; - int32_t role; - STaskStartInfo startInfo; - SRWLatch lock; - int32_t walScanCounter; - void* streamBackend; - int64_t streamBackendRid; - SHashObj* pTaskDbUnique; - TdThreadMutex backendMutex; - SMetaHbInfo* pHbInfo; + char* path; + TDB* db; + TTB* pTaskDb; + TTB* pCheckpointDb; + SHashObj* pTasksMap; + SArray* pTaskList; // SArray + void* ahandle; + TXN* txn; + FTaskExpand* expandFunc; + int32_t vgId; + int64_t stage; + int32_t role; + STaskStartInfo startInfo; + SRWLatch lock; + int32_t walScanCounter; + void* streamBackend; + int64_t streamBackendRid; + SHashObj* pTaskDbUnique; + TdThreadMutex backendMutex; + SMetaHbInfo* pHbInfo; STaskUpdateInfo updateInfo; - SHashObj* pUpdateTaskSet; - int32_t numOfStreamTasks; // this value should be increased when a new task is added into the meta - int32_t numOfPausedTasks; - int32_t chkptNotReadyTasks; - int64_t rid; + SHashObj* pUpdateTaskSet; + int32_t numOfStreamTasks; // this value should be increased when a new task is added into the meta + int32_t numOfPausedTasks; + int32_t chkptNotReadyTasks; + int64_t rid; int64_t chkpId; int32_t chkpCap; @@ -486,6 +486,7 @@ typedef struct SStreamMeta { SArray* chkpInUse; SRWLatch chkpDirLock; + void* qHandle; int32_t pauseTaskNum; } SStreamMeta; @@ -658,8 +659,8 @@ typedef struct STaskStatusEntry { typedef struct SStreamHbMsg { int32_t vgId; int32_t numOfTasks; - SArray* pTaskStatus; // SArray - SArray* pUpdateNodes; // SArray, needs update the epsets in stream tasks for those nodes. + SArray* pTaskStatus; // SArray + SArray* pUpdateNodes; // SArray, needs update the epsets in stream tasks for those nodes. } SStreamHbMsg; int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pRsp); @@ -683,7 +684,7 @@ typedef struct SNodeUpdateInfo { } SNodeUpdateInfo; typedef struct SStreamTaskNodeUpdateMsg { - int32_t transId; // to identify the msg + int32_t transId; // to identify the msg int64_t streamId; int32_t taskId; SArray* pNodeList; // SArray @@ -740,7 +741,7 @@ const char* streamTaskGetStatusStr(ETaskStatus status); void streamTaskResetStatus(SStreamTask* pTask); void streamTaskSetStatusReady(SStreamTask* pTask); -void initRpcMsg(SRpcMsg* pMsg, int32_t msgType, void* pCont, int32_t contLen); +void initRpcMsg(SRpcMsg* pMsg, int32_t msgType, void* pCont, int32_t contLen); // recover and fill history void streamTaskCheckDownstream(SStreamTask* pTask); @@ -771,18 +772,18 @@ bool streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t latestVer) int32_t streamQueueGetNumOfItems(const SStreamQueue* pQueue); // common -int32_t streamRestoreParam(SStreamTask* pTask); -void streamTaskPause(SStreamTask* pTask, SStreamMeta* pMeta); -void streamTaskResume(SStreamTask* pTask); -void streamTaskEnablePause(SStreamTask* pTask); -int32_t streamTaskSetUpstreamInfo(SStreamTask* pTask, const SStreamTask* pUpstreamTask); -void streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet); -void streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet); -void streamTaskSetFixedDownstreamInfo(SStreamTask* pTask, const SStreamTask* pDownstreamTask); -int32_t streamTaskReleaseState(SStreamTask* pTask); -int32_t streamTaskReloadState(SStreamTask* pTask); -void streamTaskCloseUpstreamInput(SStreamTask* pTask, int32_t taskId); -void streamTaskOpenAllUpstreamInput(SStreamTask* pTask); +int32_t streamRestoreParam(SStreamTask* pTask); +void streamTaskPause(SStreamTask* pTask, SStreamMeta* pMeta); +void streamTaskResume(SStreamTask* pTask); +void streamTaskEnablePause(SStreamTask* pTask); +int32_t streamTaskSetUpstreamInfo(SStreamTask* pTask, const SStreamTask* pUpstreamTask); +void streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet); +void streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet); +void streamTaskSetFixedDownstreamInfo(SStreamTask* pTask, const SStreamTask* pDownstreamTask); +int32_t streamTaskReleaseState(SStreamTask* pTask); +int32_t streamTaskReloadState(SStreamTask* pTask); +void streamTaskCloseUpstreamInput(SStreamTask* pTask, int32_t taskId); +void streamTaskOpenAllUpstreamInput(SStreamTask* pTask); void streamTaskStatusInit(STaskStatusEntry* pEntry, const SStreamTask* pTask); void streamTaskStatusCopy(STaskStatusEntry* pDst, const STaskStatusEntry* pSrc); @@ -815,7 +816,7 @@ int32_t streamMetaCommit(SStreamMeta* pMeta); int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta); int32_t streamMetaReloadAllTasks(SStreamMeta* pMeta); void streamMetaNotifyClose(SStreamMeta* pMeta); -int32_t streamTaskSetDb(SStreamMeta* pMeta, void* pTask, char *key); +int32_t streamTaskSetDb(SStreamMeta* pMeta, void* pTask, char* key); void streamMetaStartHb(SStreamMeta* pMeta); void streamMetaInitForSnode(SStreamMeta* pMeta); bool streamMetaTaskInTimer(SStreamMeta* pMeta); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index b6c973f0d0..5e5166cc34 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -18,6 +18,7 @@ #include "streamInt.h" #include "tmisce.h" #include "tref.h" +#include "tsched.h" #include "tstream.h" #include "ttimer.h" #include "wal.h" @@ -227,7 +228,7 @@ int32_t streamMetaMayCvtDbFormat(SStreamMeta* pMeta) { return 0; } -int32_t streamTaskSetDb(SStreamMeta* pMeta, void* arg, char *key) { +int32_t streamTaskSetDb(SStreamMeta* pMeta, void* arg, char* key) { SStreamTask* pTask = arg; int64_t chkpId = pTask->checkpointingId; @@ -320,8 +321,6 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF pMeta->expandFunc = expandFunc; pMeta->stage = stage; - - pMeta->pTaskDbUnique = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); // pMeta->chkpId = streamGetLatestCheckpointId(pMeta); @@ -350,10 +349,10 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF pMeta->pHbInfo->hbTmr = taosTmrStart(metaHbToMnode, META_HB_CHECK_INTERVAL, pRid, streamEnv.timer); pMeta->pHbInfo->tickCounter = 0; pMeta->pHbInfo->stopFlag = 0; - + pMeta->qHandle = taosInitScheduler(32, 1, "stream-chkp", NULL); return pMeta; - _err: +_err: taosMemoryFree(pMeta->path); if (pMeta->pTasksMap) taosHashCleanup(pMeta->pTasksMap); if (pMeta->pTaskList) taosArrayDestroy(pMeta->pTaskList); @@ -483,7 +482,7 @@ void streamMetaCloseImpl(void* arg) { taosHashCleanup(pMeta->pTasksMap); taosHashCleanup(pMeta->pTaskDbUnique); taosHashCleanup(pMeta->pUpdateTaskSet); - //taosHashCleanup(pMeta->pTaskBackendUnique); + // taosHashCleanup(pMeta->pTaskBackendUnique); taosHashCleanup(pMeta->updateInfo.pTasks); taosHashCleanup(pMeta->startInfo.pReadyTaskSet); @@ -1144,7 +1143,9 @@ void metaHbToMnode(void* param, void* tmrId) { } tEncoderClear(&encoder); - SRpcMsg msg = {.info.noResp = 1,}; + SRpcMsg msg = { + .info.noResp = 1, + }; initRpcMsg(&msg, TDMT_MND_STREAM_HEARTBEAT, buf, tlen); pMeta->pHbInfo->hbCount += 1; @@ -1156,7 +1157,7 @@ void metaHbToMnode(void* param, void* tmrId) { stDebug("vgId:%d no tasks and no mnd epset, not send stream hb to mnode", pMeta->vgId); } - _end: +_end: clearHbMsg(&hbMsg, pIdList); taosTmrReset(metaHbToMnode, META_HB_CHECK_INTERVAL, param, streamEnv.timer, &pMeta->pHbInfo->hbTmr); taosReleaseRef(streamMetaId, rid); @@ -1251,7 +1252,6 @@ void streamMetaRLock(SStreamMeta* pMeta) { void streamMetaRUnLock(SStreamMeta* pMeta) { stTrace("vgId:%d meta-runlock", pMeta->vgId); taosRUnLockLatch(&pMeta->lock); - } void streamMetaWLock(SStreamMeta* pMeta) { stTrace("vgId:%d meta-wlock", pMeta->vgId); @@ -1261,4 +1261,3 @@ void streamMetaWUnLock(SStreamMeta* pMeta) { stTrace("vgId:%d meta-wunlock", pMeta->vgId); taosWUnLockLatch(&pMeta->lock); } -