merge 3.0

This commit is contained in:
yihaoDeng 2023-11-08 14:14:01 +08:00
parent fb77d811f9
commit 7b0891981e
2 changed files with 62 additions and 62 deletions

View File

@ -39,16 +39,16 @@ extern "C" {
#define TASK_DOWNSTREAM_NOT_LEADER 0x2 #define TASK_DOWNSTREAM_NOT_LEADER 0x2
#define TASK_SELF_NEW_STAGE 0x3 #define TASK_SELF_NEW_STAGE 0x3
#define TASK_DOWNSTREAM_READY 0x0 #define TASK_DOWNSTREAM_READY 0x0
#define TASK_DOWNSTREAM_NOT_READY 0x1 #define TASK_DOWNSTREAM_NOT_READY 0x1
#define TASK_DOWNSTREAM_NOT_LEADER 0x2 #define TASK_DOWNSTREAM_NOT_LEADER 0x2
#define TASK_UPSTREAM_NEW_STAGE 0x3 #define TASK_UPSTREAM_NEW_STAGE 0x3
#define NODE_ROLE_UNINIT 0x1 #define NODE_ROLE_UNINIT 0x1
#define NODE_ROLE_LEADER 0x2 #define NODE_ROLE_LEADER 0x2
#define NODE_ROLE_FOLLOWER 0x3 #define NODE_ROLE_FOLLOWER 0x3
#define HAS_RELATED_FILLHISTORY_TASK(_t) ((_t)->hTaskInfo.id.taskId != 0) #define HAS_RELATED_FILLHISTORY_TASK(_t) ((_t)->hTaskInfo.id.taskId != 0)
#define CLEAR_RELATED_FILLHISTORY_TASK(_t) \ #define CLEAR_RELATED_FILLHISTORY_TASK(_t) \
do { \ do { \
(_t)->hTaskInfo.id.taskId = 0; \ (_t)->hTaskInfo.id.taskId = 0; \
@ -378,7 +378,7 @@ typedef struct SHistoryTaskInfo {
int32_t tickCount; int32_t tickCount;
int32_t retryTimes; int32_t retryTimes;
int32_t waitInterval; int32_t waitInterval;
int64_t haltVer; // offset in wal when halt the stream task int64_t haltVer; // offset in wal when halt the stream task
} SHistoryTaskInfo; } SHistoryTaskInfo;
typedef struct STaskOutputInfo { typedef struct STaskOutputInfo {
@ -442,7 +442,7 @@ typedef struct STaskStartInfo {
int64_t startTs; int64_t startTs;
int64_t readyTs; int64_t readyTs;
int32_t startAllTasksFlag; int32_t startAllTasksFlag;
SHashObj* pReadyTaskSet; // tasks that are all ready for running stream processing SHashObj* pReadyTaskSet; // tasks that are all ready for running stream processing
int32_t elapsedTime; int32_t elapsedTime;
} STaskStartInfo; } STaskStartInfo;
@ -453,32 +453,32 @@ typedef struct STaskUpdateInfo {
// meta // meta
typedef struct SStreamMeta { typedef struct SStreamMeta {
char* path; char* path;
TDB* db; TDB* db;
TTB* pTaskDb; TTB* pTaskDb;
TTB* pCheckpointDb; TTB* pCheckpointDb;
SHashObj* pTasksMap; SHashObj* pTasksMap;
SArray* pTaskList; // SArray<STaskId*> SArray* pTaskList; // SArray<STaskId*>
void* ahandle; void* ahandle;
TXN* txn; TXN* txn;
FTaskExpand* expandFunc; FTaskExpand* expandFunc;
int32_t vgId; int32_t vgId;
int64_t stage; int64_t stage;
int32_t role; int32_t role;
STaskStartInfo startInfo; STaskStartInfo startInfo;
SRWLatch lock; SRWLatch lock;
int32_t walScanCounter; int32_t walScanCounter;
void* streamBackend; void* streamBackend;
int64_t streamBackendRid; int64_t streamBackendRid;
SHashObj* pTaskDbUnique; SHashObj* pTaskDbUnique;
TdThreadMutex backendMutex; TdThreadMutex backendMutex;
SMetaHbInfo* pHbInfo; SMetaHbInfo* pHbInfo;
STaskUpdateInfo updateInfo; STaskUpdateInfo updateInfo;
SHashObj* pUpdateTaskSet; SHashObj* pUpdateTaskSet;
int32_t numOfStreamTasks; // this value should be increased when a new task is added into the meta int32_t numOfStreamTasks; // this value should be increased when a new task is added into the meta
int32_t numOfPausedTasks; int32_t numOfPausedTasks;
int32_t chkptNotReadyTasks; int32_t chkptNotReadyTasks;
int64_t rid; int64_t rid;
int64_t chkpId; int64_t chkpId;
int32_t chkpCap; int32_t chkpCap;
@ -486,6 +486,7 @@ typedef struct SStreamMeta {
SArray* chkpInUse; SArray* chkpInUse;
SRWLatch chkpDirLock; SRWLatch chkpDirLock;
void* qHandle;
int32_t pauseTaskNum; int32_t pauseTaskNum;
} SStreamMeta; } SStreamMeta;
@ -658,8 +659,8 @@ typedef struct STaskStatusEntry {
typedef struct SStreamHbMsg { typedef struct SStreamHbMsg {
int32_t vgId; int32_t vgId;
int32_t numOfTasks; int32_t numOfTasks;
SArray* pTaskStatus; // SArray<STaskStatusEntry> SArray* pTaskStatus; // SArray<STaskStatusEntry>
SArray* pUpdateNodes; // SArray<int32_t>, needs update the epsets in stream tasks for those nodes. SArray* pUpdateNodes; // SArray<int32_t>, needs update the epsets in stream tasks for those nodes.
} SStreamHbMsg; } SStreamHbMsg;
int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pRsp); int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pRsp);
@ -683,7 +684,7 @@ typedef struct SNodeUpdateInfo {
} SNodeUpdateInfo; } SNodeUpdateInfo;
typedef struct SStreamTaskNodeUpdateMsg { typedef struct SStreamTaskNodeUpdateMsg {
int32_t transId; // to identify the msg int32_t transId; // to identify the msg
int64_t streamId; int64_t streamId;
int32_t taskId; int32_t taskId;
SArray* pNodeList; // SArray<SNodeUpdateInfo> SArray* pNodeList; // SArray<SNodeUpdateInfo>
@ -740,7 +741,7 @@ const char* streamTaskGetStatusStr(ETaskStatus status);
void streamTaskResetStatus(SStreamTask* pTask); void streamTaskResetStatus(SStreamTask* pTask);
void streamTaskSetStatusReady(SStreamTask* pTask); void streamTaskSetStatusReady(SStreamTask* pTask);
void initRpcMsg(SRpcMsg* pMsg, int32_t msgType, void* pCont, int32_t contLen); void initRpcMsg(SRpcMsg* pMsg, int32_t msgType, void* pCont, int32_t contLen);
// recover and fill history // recover and fill history
void streamTaskCheckDownstream(SStreamTask* pTask); void streamTaskCheckDownstream(SStreamTask* pTask);
@ -771,18 +772,18 @@ bool streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t latestVer)
int32_t streamQueueGetNumOfItems(const SStreamQueue* pQueue); int32_t streamQueueGetNumOfItems(const SStreamQueue* pQueue);
// common // common
int32_t streamRestoreParam(SStreamTask* pTask); int32_t streamRestoreParam(SStreamTask* pTask);
void streamTaskPause(SStreamTask* pTask, SStreamMeta* pMeta); void streamTaskPause(SStreamTask* pTask, SStreamMeta* pMeta);
void streamTaskResume(SStreamTask* pTask); void streamTaskResume(SStreamTask* pTask);
void streamTaskEnablePause(SStreamTask* pTask); void streamTaskEnablePause(SStreamTask* pTask);
int32_t streamTaskSetUpstreamInfo(SStreamTask* pTask, const SStreamTask* pUpstreamTask); int32_t streamTaskSetUpstreamInfo(SStreamTask* pTask, const SStreamTask* pUpstreamTask);
void streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet); void streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet);
void streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet); void streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet);
void streamTaskSetFixedDownstreamInfo(SStreamTask* pTask, const SStreamTask* pDownstreamTask); void streamTaskSetFixedDownstreamInfo(SStreamTask* pTask, const SStreamTask* pDownstreamTask);
int32_t streamTaskReleaseState(SStreamTask* pTask); int32_t streamTaskReleaseState(SStreamTask* pTask);
int32_t streamTaskReloadState(SStreamTask* pTask); int32_t streamTaskReloadState(SStreamTask* pTask);
void streamTaskCloseUpstreamInput(SStreamTask* pTask, int32_t taskId); void streamTaskCloseUpstreamInput(SStreamTask* pTask, int32_t taskId);
void streamTaskOpenAllUpstreamInput(SStreamTask* pTask); void streamTaskOpenAllUpstreamInput(SStreamTask* pTask);
void streamTaskStatusInit(STaskStatusEntry* pEntry, const SStreamTask* pTask); void streamTaskStatusInit(STaskStatusEntry* pEntry, const SStreamTask* pTask);
void streamTaskStatusCopy(STaskStatusEntry* pDst, const STaskStatusEntry* pSrc); void streamTaskStatusCopy(STaskStatusEntry* pDst, const STaskStatusEntry* pSrc);
@ -815,7 +816,7 @@ int32_t streamMetaCommit(SStreamMeta* pMeta);
int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta); int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta);
int32_t streamMetaReloadAllTasks(SStreamMeta* pMeta); int32_t streamMetaReloadAllTasks(SStreamMeta* pMeta);
void streamMetaNotifyClose(SStreamMeta* pMeta); void streamMetaNotifyClose(SStreamMeta* pMeta);
int32_t streamTaskSetDb(SStreamMeta* pMeta, void* pTask, char *key); int32_t streamTaskSetDb(SStreamMeta* pMeta, void* pTask, char* key);
void streamMetaStartHb(SStreamMeta* pMeta); void streamMetaStartHb(SStreamMeta* pMeta);
void streamMetaInitForSnode(SStreamMeta* pMeta); void streamMetaInitForSnode(SStreamMeta* pMeta);
bool streamMetaTaskInTimer(SStreamMeta* pMeta); bool streamMetaTaskInTimer(SStreamMeta* pMeta);

View File

@ -18,6 +18,7 @@
#include "streamInt.h" #include "streamInt.h"
#include "tmisce.h" #include "tmisce.h"
#include "tref.h" #include "tref.h"
#include "tsched.h"
#include "tstream.h" #include "tstream.h"
#include "ttimer.h" #include "ttimer.h"
#include "wal.h" #include "wal.h"
@ -227,7 +228,7 @@ int32_t streamMetaMayCvtDbFormat(SStreamMeta* pMeta) {
return 0; return 0;
} }
int32_t streamTaskSetDb(SStreamMeta* pMeta, void* arg, char *key) { int32_t streamTaskSetDb(SStreamMeta* pMeta, void* arg, char* key) {
SStreamTask* pTask = arg; SStreamTask* pTask = arg;
int64_t chkpId = pTask->checkpointingId; int64_t chkpId = pTask->checkpointingId;
@ -320,8 +321,6 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
pMeta->expandFunc = expandFunc; pMeta->expandFunc = expandFunc;
pMeta->stage = stage; pMeta->stage = stage;
pMeta->pTaskDbUnique = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); pMeta->pTaskDbUnique = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
// pMeta->chkpId = streamGetLatestCheckpointId(pMeta); // pMeta->chkpId = streamGetLatestCheckpointId(pMeta);
@ -350,10 +349,10 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
pMeta->pHbInfo->hbTmr = taosTmrStart(metaHbToMnode, META_HB_CHECK_INTERVAL, pRid, streamEnv.timer); pMeta->pHbInfo->hbTmr = taosTmrStart(metaHbToMnode, META_HB_CHECK_INTERVAL, pRid, streamEnv.timer);
pMeta->pHbInfo->tickCounter = 0; pMeta->pHbInfo->tickCounter = 0;
pMeta->pHbInfo->stopFlag = 0; pMeta->pHbInfo->stopFlag = 0;
pMeta->qHandle = taosInitScheduler(32, 1, "stream-chkp", NULL);
return pMeta; return pMeta;
_err: _err:
taosMemoryFree(pMeta->path); taosMemoryFree(pMeta->path);
if (pMeta->pTasksMap) taosHashCleanup(pMeta->pTasksMap); if (pMeta->pTasksMap) taosHashCleanup(pMeta->pTasksMap);
if (pMeta->pTaskList) taosArrayDestroy(pMeta->pTaskList); if (pMeta->pTaskList) taosArrayDestroy(pMeta->pTaskList);
@ -483,7 +482,7 @@ void streamMetaCloseImpl(void* arg) {
taosHashCleanup(pMeta->pTasksMap); taosHashCleanup(pMeta->pTasksMap);
taosHashCleanup(pMeta->pTaskDbUnique); taosHashCleanup(pMeta->pTaskDbUnique);
taosHashCleanup(pMeta->pUpdateTaskSet); taosHashCleanup(pMeta->pUpdateTaskSet);
//taosHashCleanup(pMeta->pTaskBackendUnique); // taosHashCleanup(pMeta->pTaskBackendUnique);
taosHashCleanup(pMeta->updateInfo.pTasks); taosHashCleanup(pMeta->updateInfo.pTasks);
taosHashCleanup(pMeta->startInfo.pReadyTaskSet); taosHashCleanup(pMeta->startInfo.pReadyTaskSet);
@ -1144,7 +1143,9 @@ void metaHbToMnode(void* param, void* tmrId) {
} }
tEncoderClear(&encoder); tEncoderClear(&encoder);
SRpcMsg msg = {.info.noResp = 1,}; SRpcMsg msg = {
.info.noResp = 1,
};
initRpcMsg(&msg, TDMT_MND_STREAM_HEARTBEAT, buf, tlen); initRpcMsg(&msg, TDMT_MND_STREAM_HEARTBEAT, buf, tlen);
pMeta->pHbInfo->hbCount += 1; pMeta->pHbInfo->hbCount += 1;
@ -1156,7 +1157,7 @@ void metaHbToMnode(void* param, void* tmrId) {
stDebug("vgId:%d no tasks and no mnd epset, not send stream hb to mnode", pMeta->vgId); stDebug("vgId:%d no tasks and no mnd epset, not send stream hb to mnode", pMeta->vgId);
} }
_end: _end:
clearHbMsg(&hbMsg, pIdList); clearHbMsg(&hbMsg, pIdList);
taosTmrReset(metaHbToMnode, META_HB_CHECK_INTERVAL, param, streamEnv.timer, &pMeta->pHbInfo->hbTmr); taosTmrReset(metaHbToMnode, META_HB_CHECK_INTERVAL, param, streamEnv.timer, &pMeta->pHbInfo->hbTmr);
taosReleaseRef(streamMetaId, rid); taosReleaseRef(streamMetaId, rid);
@ -1251,7 +1252,6 @@ void streamMetaRLock(SStreamMeta* pMeta) {
void streamMetaRUnLock(SStreamMeta* pMeta) { void streamMetaRUnLock(SStreamMeta* pMeta) {
stTrace("vgId:%d meta-runlock", pMeta->vgId); stTrace("vgId:%d meta-runlock", pMeta->vgId);
taosRUnLockLatch(&pMeta->lock); taosRUnLockLatch(&pMeta->lock);
} }
void streamMetaWLock(SStreamMeta* pMeta) { void streamMetaWLock(SStreamMeta* pMeta) {
stTrace("vgId:%d meta-wlock", pMeta->vgId); stTrace("vgId:%d meta-wlock", pMeta->vgId);
@ -1261,4 +1261,3 @@ void streamMetaWUnLock(SStreamMeta* pMeta) {
stTrace("vgId:%d meta-wunlock", pMeta->vgId); stTrace("vgId:%d meta-wunlock", pMeta->vgId);
taosWUnLockLatch(&pMeta->lock); taosWUnLockLatch(&pMeta->lock);
} }