Merge pull request #24634 from taosdata/fix/3_liaohj
refactor: do some internal refactor.
This commit is contained in:
commit
bfbd61a02f
|
@ -343,7 +343,6 @@
|
|||
|
||||
TD_NEW_MSG_SEG(TDMT_VND_STREAM_MSG) //7 << 8
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_SCAN_HISTORY, "vnode-stream-scan-history", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_SCAN_HISTORY_FINISH, "vnode-stream-scan-history-finish", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_CHECK_POINT_SOURCE, "vnode-stream-checkpoint-source", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TASK_UPDATE, "vnode-stream-update", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TASK_RESET, "vnode-stream-reset", NULL, NULL)
|
||||
|
|
|
@ -23,8 +23,6 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM
|
|||
int32_t tqStreamTaskProcessDispatchReq(SStreamMeta* pMeta, SRpcMsg* pMsg);
|
||||
int32_t tqStreamTaskProcessDispatchRsp(SStreamMeta* pMeta, SRpcMsg* pMsg);
|
||||
int32_t tqStreamTaskProcessRetrieveReq(SStreamMeta* pMeta, SRpcMsg* pMsg);
|
||||
int32_t tqStreamTaskProcessScanHistoryFinishReq(SStreamMeta* pMeta, SRpcMsg* pMsg);
|
||||
int32_t tqStreamTaskProcessScanHistoryFinishRsp(SStreamMeta* pMeta, SRpcMsg* pMsg);
|
||||
int32_t tqStreamTaskProcessCheckReq(SStreamMeta* pMeta, SRpcMsg* pMsg);
|
||||
int32_t tqStreamTaskProcessCheckRsp(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLeader);
|
||||
int32_t tqStreamTaskProcessCheckpointReadyMsg(SStreamMeta* pMeta, SRpcMsg* pMsg);
|
||||
|
|
|
@ -210,7 +210,6 @@ void* qExtractReaderFromStreamScanner(void* scanner);
|
|||
int32_t qExtractStreamScanner(qTaskInfo_t tinfo, void** scanner);
|
||||
|
||||
int32_t qSetStreamOperatorOptionForScanHistory(qTaskInfo_t tinfo);
|
||||
int32_t qResetStreamOperatorOptionForScanHistory(qTaskInfo_t tinfo);
|
||||
int32_t qStreamSourceScanParamForHistoryScanStep1(qTaskInfo_t tinfo, SVersionRange *pVerRange, STimeWindow* pWindow);
|
||||
int32_t qStreamSourceScanParamForHistoryScanStep2(qTaskInfo_t tinfo, SVersionRange *pVerRange, STimeWindow* pWindow);
|
||||
int32_t qStreamRecoverFinish(qTaskInfo_t tinfo);
|
||||
|
|
|
@ -628,17 +628,6 @@ typedef struct {
|
|||
int8_t igUntreated;
|
||||
} SStreamScanHistoryReq;
|
||||
|
||||
typedef struct {
|
||||
int64_t streamId;
|
||||
int32_t upstreamTaskId;
|
||||
int32_t downstreamTaskId;
|
||||
int32_t upstreamNodeId;
|
||||
int32_t childId;
|
||||
} SStreamScanHistoryFinishReq;
|
||||
|
||||
int32_t tEncodeStreamScanHistoryFinishReq(SEncoder* pEncoder, const SStreamScanHistoryFinishReq* pReq);
|
||||
int32_t tDecodeStreamScanHistoryFinishReq(SDecoder* pDecoder, SStreamScanHistoryFinishReq* pReq);
|
||||
|
||||
// mndTrigger: denote if this checkpoint is triggered by mnode or as requested from tasks when transfer-state finished
|
||||
typedef struct {
|
||||
int64_t streamId;
|
||||
|
@ -713,17 +702,6 @@ int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pRsp);
|
|||
int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pRsp);
|
||||
void streamMetaClearHbMsg(SStreamHbMsg* pMsg);
|
||||
|
||||
typedef struct {
|
||||
int64_t streamId;
|
||||
int32_t upstreamTaskId;
|
||||
int32_t upstreamNodeId;
|
||||
int32_t downstreamId;
|
||||
int32_t downstreamNode;
|
||||
} SStreamCompleteHistoryMsg;
|
||||
|
||||
int32_t tEncodeCompleteHistoryDataMsg(SEncoder* pEncoder, const SStreamCompleteHistoryMsg* pReq);
|
||||
int32_t tDecodeCompleteHistoryDataMsg(SDecoder* pDecoder, SStreamCompleteHistoryMsg* pReq);
|
||||
|
||||
typedef struct SNodeUpdateInfo {
|
||||
int32_t nodeId;
|
||||
SEpSet prevEp;
|
||||
|
@ -820,7 +798,6 @@ int8_t streamTaskSetSchedStatusInactive(SStreamTask* pTask);
|
|||
int32_t streamTaskClearHTaskAttr(SStreamTask* pTask, bool metaLock);
|
||||
|
||||
int32_t streamTaskHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event);
|
||||
int32_t streamTaskHandleEventAsync(SStreamTaskSM* pSM, EStreamTaskEvent event, void* pFn);
|
||||
int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent event);
|
||||
void streamTaskRestoreStatus(SStreamTask* pTask);
|
||||
|
||||
|
@ -829,7 +806,6 @@ int32_t streamSendCheckRsp(const SStreamMeta* pMeta, const SStreamTaskCheckReq*
|
|||
SRpcHandleInfo* pRpcInfo, int32_t taskId);
|
||||
int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp);
|
||||
int32_t streamLaunchFillHistoryTask(SStreamTask* pTask);
|
||||
int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask);
|
||||
int32_t streamStartScanHistoryAsync(SStreamTask* pTask, int8_t igUntreated);
|
||||
int32_t streamReExecScanHistoryFuture(SStreamTask* pTask, int32_t idleDuration);
|
||||
bool streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t latestVer);
|
||||
|
@ -859,11 +835,6 @@ void streamTaskStatusCopy(STaskStatusEntry* pDst, const STaskStatusEntry* pSrc);
|
|||
int32_t streamSetParamForStreamScannerStep1(SStreamTask* pTask, SVersionRange* pVerRange, STimeWindow* pWindow);
|
||||
int32_t streamSetParamForStreamScannerStep2(SStreamTask* pTask, SVersionRange* pVerRange, STimeWindow* pWindow);
|
||||
SScanhistoryDataInfo streamScanHistoryData(SStreamTask* pTask, int64_t st);
|
||||
int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask);
|
||||
|
||||
// agg level
|
||||
int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, SStreamScanHistoryFinishReq* pReq, SRpcHandleInfo* pInfo);
|
||||
int32_t streamProcessScanHistoryFinishRsp(SStreamTask* pTask);
|
||||
|
||||
// stream task meta
|
||||
void streamMetaInit();
|
||||
|
|
|
@ -86,8 +86,6 @@ SArray *smGetMsgHandles() {
|
|||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_STOP, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_SCAN_HISTORY_FINISH, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_SCAN_HISTORY_FINISH_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT_READY, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_RESET, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
|
||||
|
||||
|
|
|
@ -828,8 +828,6 @@ SArray *vmGetMsgHandles() {
|
|||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_SCAN_HISTORY_FINISH, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_SCAN_HISTORY_FINISH_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_PAUSE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
|
|
|
@ -707,13 +707,6 @@ int32_t tEncodeSStreamObj(SEncoder* pEncoder, const SStreamObj* pObj);
|
|||
int32_t tDecodeSStreamObj(SDecoder* pDecoder, SStreamObj* pObj, int32_t sver);
|
||||
void tFreeStreamObj(SStreamObj* pObj);
|
||||
|
||||
// typedef struct {
|
||||
// char streamName[TSDB_STREAM_FNAME_LEN];
|
||||
// int64_t uid;
|
||||
// int64_t streamUid;
|
||||
// SArray* childInfo; // SArray<SStreamChildEpInfo>
|
||||
// } SStreamCheckpointObj;
|
||||
|
||||
#define VIEW_TYPE_UPDATABLE (1 << 0)
|
||||
#define VIEW_TYPE_MATERIALIZED (1 << 1)
|
||||
|
||||
|
|
|
@ -33,6 +33,11 @@ typedef struct SStreamTransInfo {
|
|||
int32_t transId;
|
||||
} SStreamTransInfo;
|
||||
|
||||
typedef struct SVgroupChangeInfo {
|
||||
SHashObj *pDBMap;
|
||||
SArray *pUpdateNodeList; // SArray<SNodeUpdateInfo>
|
||||
} SVgroupChangeInfo;
|
||||
|
||||
// time to generated the checkpoint, if now() - checkpointTs >= tsCheckpointInterval, this checkpoint will be discard
|
||||
// to avoid too many checkpoints for a taskk in the waiting list
|
||||
typedef struct SCheckpointCandEntry {
|
||||
|
@ -94,25 +99,27 @@ int32_t mndStreamGetRelTrans(SMnode *pMnode, int64_t streamUid);
|
|||
|
||||
// for sma
|
||||
// TODO refactor
|
||||
int32_t mndDropStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream);
|
||||
int32_t mndPersistDropStreamLog(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream);
|
||||
|
||||
int32_t mndDropStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream);
|
||||
int32_t mndGetNumOfStreams(SMnode *pMnode, char *dbName, int32_t *pNumOfStreams);
|
||||
int32_t mndGetNumOfStreamTasks(const SStreamObj *pStream);
|
||||
SArray *mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady);
|
||||
void mndKillTransImpl(SMnode *pMnode, int32_t transId, const char *pDbName);
|
||||
void initTransAction(STransAction *pAction, void *pCont, int32_t contLen, int32_t msgType, const SEpSet *pEpset,
|
||||
int32_t retryCode);
|
||||
int32_t setTransAction(STrans *pTrans, void *pCont, int32_t contLen, int32_t msgType, const SEpSet *pEpset,
|
||||
int32_t retryCode);
|
||||
STrans *doCreateTrans(SMnode *pMnode, SStreamObj *pStream, SRpcMsg *pReq, const char *name, const char *pMsg);
|
||||
int32_t mndPersistTransLog(SStreamObj *pStream, STrans *pTrans, int32_t status);
|
||||
SSdbRaw *mndStreamActionEncode(SStreamObj *pStream);
|
||||
void killAllCheckpointTrans(SMnode *pMnode, SVgroupChangeInfo *pChangeInfo);
|
||||
int32_t mndStreamSetUpdateEpsetAction(SStreamObj *pStream, SVgroupChangeInfo *pInfo, STrans *pTrans);
|
||||
|
||||
SStreamObj *mndGetStreamObj(SMnode *pMnode, int64_t streamId);
|
||||
int32_t extractNodeEpset(SMnode *pMnode, SEpSet *pEpSet, bool *hasEpset, int32_t taskId, int32_t nodeId);
|
||||
int32_t mndProcessStreamHb(SRpcMsg *pReq);
|
||||
void saveStreamTasksInfo(SStreamObj *pStream, SStreamExecInfo *pExecNode);
|
||||
int32_t initStreamNodeList(SMnode *pMnode);
|
||||
int32_t mndResumeStreamTasks(STrans *pTrans, SMnode *pMnode, SStreamObj* pStream, int8_t igUntreated);
|
||||
int32_t mndPauseStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream);
|
||||
int32_t mndStreamSetResumeAction(STrans *pTrans, SMnode *pMnode, SStreamObj* pStream, int8_t igUntreated);
|
||||
int32_t mndStreamSetPauseAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream);
|
||||
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -29,10 +29,9 @@
|
|||
|
||||
#define MND_STREAM_MAX_NUM 60
|
||||
|
||||
typedef struct SVgroupChangeInfo {
|
||||
SHashObj *pDBMap;
|
||||
SArray *pUpdateNodeList; // SArray<SNodeUpdateInfo>
|
||||
} SVgroupChangeInfo;
|
||||
typedef struct SMStreamNodeCheckMsg {
|
||||
int8_t placeHolder; // // to fix windows compile error, define place holder
|
||||
} SMStreamNodeCheckMsg;
|
||||
|
||||
static int32_t mndNodeCheckSentinel = 0;
|
||||
SStreamExecInfo execInfo;
|
||||
|
@ -60,14 +59,11 @@ static int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq);
|
|||
|
||||
static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pPrevNodeList, const SArray *pNodeList);
|
||||
|
||||
static int32_t createStreamUpdateTrans(SStreamObj *pStream, SVgroupChangeInfo *pInfo, STrans *pTrans);
|
||||
static void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode);
|
||||
static int32_t removeExpirednodeEntryAndTask(SArray *pNodeSnapshot);
|
||||
static int32_t doKillCheckpointTrans(SMnode *pMnode, const char *pDbName, size_t len);
|
||||
|
||||
static void freeCheckpointCandEntry(void *);
|
||||
static void freeTaskList(void *param);
|
||||
|
||||
static void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode);
|
||||
static int32_t removeExpirednodeEntryAndTask(SArray *pNodeSnapshot);
|
||||
static int32_t doKillCheckpointTrans(SMnode *pMnode, const char *pDbName, size_t len);
|
||||
static void freeCheckpointCandEntry(void *);
|
||||
static void freeTaskList(void *param);
|
||||
static SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw);
|
||||
|
||||
SSdbRaw *mndStreamSeqActionEncode(SStreamObj *pStream);
|
||||
|
@ -470,10 +466,8 @@ int32_t mndPersistTaskDeployReq(STrans *pTrans, SStreamTask *pTask) {
|
|||
tEncodeStreamTask(&encoder, pTask);
|
||||
tEncoderClear(&encoder);
|
||||
|
||||
STransAction action = {0};
|
||||
action.mTraceId = pTrans->mTraceId;
|
||||
initTransAction(&action, buf, tlen, TDMT_STREAM_TASK_DEPLOY, &pTask->info.epSet, 0);
|
||||
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
|
||||
int32_t code = setTransAction(pTrans, buf, tlen, TDMT_STREAM_TASK_DEPLOY, &pTask->info.epSet, 0);
|
||||
if (code != 0) {
|
||||
taosMemoryFree(buf);
|
||||
return -1;
|
||||
}
|
||||
|
@ -614,8 +608,6 @@ _OVER:
|
|||
return -1;
|
||||
}
|
||||
|
||||
|
||||
|
||||
static int32_t mndPersistTaskDropReq(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask) {
|
||||
SVDropStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVDropStreamTaskReq));
|
||||
if (pReq == NULL) {
|
||||
|
@ -627,24 +619,17 @@ static int32_t mndPersistTaskDropReq(SMnode *pMnode, STrans *pTrans, SStreamTask
|
|||
pReq->taskId = pTask->id.taskId;
|
||||
pReq->streamId = pTask->id.streamId;
|
||||
|
||||
STransAction action = {0};
|
||||
SEpSet epset = {0};
|
||||
bool hasEpset = false;
|
||||
|
||||
SEpSet epset = {0};
|
||||
bool hasEpset = false;
|
||||
int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
if (code != TSDB_CODE_SUCCESS || !hasEpset) { // no valid epset, return directly without redoAction
|
||||
terrno = code;
|
||||
return -1;
|
||||
}
|
||||
|
||||
// no valid epset, return directly without redoAction
|
||||
if (!hasEpset) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
// The epset of nodeId of this task may have been expired now, let's use the newest epset from mnode.
|
||||
initTransAction(&action, pReq, sizeof(SVDropStreamTaskReq), TDMT_STREAM_TASK_DROP, &epset, 0);
|
||||
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
|
||||
code = setTransAction(pTrans, pReq, sizeof(SVDropStreamTaskReq), TDMT_STREAM_TASK_DROP, &epset, 0);
|
||||
if (code != 0) {
|
||||
taosMemoryFree(pReq);
|
||||
return -1;
|
||||
}
|
||||
|
@ -752,17 +737,8 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
|
|||
goto _OVER;
|
||||
}
|
||||
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pReq, MND_STREAM_CREATE_NAME);
|
||||
STrans *pTrans = doCreateTrans(pMnode, &streamObj, pReq, MND_STREAM_CREATE_NAME, "create stream tasks on dnodes");
|
||||
if (pTrans == NULL) {
|
||||
mError("stream:%s, failed to create since %s", createStreamReq.name, terrstr());
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
mInfo("trans:%d, used to create stream:%s", pTrans->id, createStreamReq.name);
|
||||
|
||||
mndTransSetDbName(pTrans, createStreamReq.sourceDB, streamObj.targetSTbName);
|
||||
if (mndTransCheckConflict(pMnode, pTrans) != 0) {
|
||||
mndTransDrop(pTrans);
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
|
@ -808,6 +784,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
|
|||
mndTransDrop(pTrans);
|
||||
|
||||
taosThreadMutexLock(&execInfo.lock);
|
||||
|
||||
mDebug("stream tasks register into node list");
|
||||
saveStreamTasksInfo(&streamObj, &execInfo);
|
||||
taosThreadMutexUnlock(&execInfo.lock);
|
||||
|
@ -940,20 +917,14 @@ static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStre
|
|||
return -1;
|
||||
}
|
||||
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, NULL, MND_STREAM_CHECKPOINT_NAME);
|
||||
STrans *pTrans = doCreateTrans(pMnode, pStream, NULL, MND_STREAM_CHECKPOINT_NAME, "gen checkpoint for stream");
|
||||
if (pTrans == NULL) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
mndStreamRegisterTrans(pTrans, MND_STREAM_CHECKPOINT_NAME, pStream->uid);
|
||||
|
||||
mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetSTbName);
|
||||
if (mndTrancCheckConflict(pMnode, pTrans) != 0) {
|
||||
mError("failed to checkpoint of stream name%s, checkpointId: %" PRId64 ", reason:%s", pStream->name, checkpointId,
|
||||
tstrerror(TSDB_CODE_MND_TRANS_CONFLICT));
|
||||
goto _ERR;
|
||||
}
|
||||
|
||||
mndStreamRegisterTrans(pTrans, MND_STREAM_CHECKPOINT_NAME, pStream->uid);
|
||||
mDebug("start to trigger checkpoint for stream:%s, checkpoint: %" PRId64 "", pStream->name, checkpointId);
|
||||
|
||||
taosWLockLatch(&pStream->lock);
|
||||
|
@ -970,27 +941,26 @@ static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStre
|
|||
for (int32_t j = 0; j < sz; j++) {
|
||||
SStreamTask *pTask = taosArrayGetP(pLevel, j);
|
||||
|
||||
SVgObj *pVgObj = mndAcquireVgroup(pMnode, pTask->info.nodeId);
|
||||
if (pVgObj == NULL) {
|
||||
taosWUnLockLatch(&pStream->lock);
|
||||
goto _ERR;
|
||||
}
|
||||
|
||||
void *buf;
|
||||
int32_t tlen;
|
||||
if (mndBuildStreamCheckpointSourceReq(&buf, &tlen, pTask->info.nodeId, checkpointId, pTask->id.streamId,
|
||||
pTask->id.taskId, pTrans->id, mndTrigger) < 0) {
|
||||
mndReleaseVgroup(pMnode, pVgObj);
|
||||
pTask->id.taskId, pTrans->id, mndTrigger) < 0) {
|
||||
taosWUnLockLatch(&pStream->lock);
|
||||
goto _ERR;
|
||||
}
|
||||
|
||||
STransAction act = {0};
|
||||
SEpSet epset = mndGetVgroupEpset(pMnode, pVgObj);
|
||||
mndReleaseVgroup(pMnode, pVgObj);
|
||||
SEpSet epset = {0};
|
||||
bool hasEpset = false;
|
||||
code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId);
|
||||
if (code != TSDB_CODE_SUCCESS || !hasEpset) {
|
||||
taosMemoryFree(buf);
|
||||
taosWUnLockLatch(&pStream->lock);
|
||||
goto _ERR;
|
||||
}
|
||||
|
||||
initTransAction(&act, buf, tlen, TDMT_VND_STREAM_CHECK_POINT_SOURCE, &epset, TSDB_CODE_SYN_PROPOSE_NOT_READY);
|
||||
if (mndTransAppendRedoAction(pTrans, &act) != 0) {
|
||||
code = setTransAction(pTrans, buf, tlen, TDMT_VND_STREAM_CHECK_POINT_SOURCE, &epset,
|
||||
TSDB_CODE_SYN_PROPOSE_NOT_READY);
|
||||
if (code != 0) {
|
||||
taosMemoryFree(buf);
|
||||
taosWUnLockLatch(&pStream->lock);
|
||||
goto _ERR;
|
||||
|
@ -1219,7 +1189,7 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
|
|||
return -1;
|
||||
}
|
||||
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pReq, MND_STREAM_DROP_NAME);
|
||||
STrans* pTrans = doCreateTrans(pMnode, pStream, pReq, MND_STREAM_DROP_NAME, "drop stream");
|
||||
if (pTrans == NULL) {
|
||||
mError("stream:%s, failed to drop since %s", dropReq.name, terrstr());
|
||||
sdbRelease(pMnode->pSdb, pStream);
|
||||
|
@ -1227,16 +1197,6 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
|
|||
return -1;
|
||||
}
|
||||
|
||||
mInfo("trans:%d used to drop stream:%s", pTrans->id, dropReq.name);
|
||||
|
||||
mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetSTbName);
|
||||
if (mndTransCheckConflict(pMnode, pTrans) != 0) {
|
||||
sdbRelease(pMnode->pSdb, pStream);
|
||||
mndTransDrop(pTrans);
|
||||
tFreeMDropStreamReq(&dropReq);
|
||||
return -1;
|
||||
}
|
||||
|
||||
int32_t code = mndStreamRegisterTrans(pTrans, MND_STREAM_DROP_NAME, pStream->uid);
|
||||
|
||||
// drop all tasks
|
||||
|
@ -1563,18 +1523,6 @@ static int32_t setTaskAttrInResBlock(SStreamObj *pStream, SStreamTask *pTask, SS
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t getNumOfTasks(SArray *pTaskList) {
|
||||
int32_t numOfLevels = taosArrayGetSize(pTaskList);
|
||||
|
||||
int32_t count = 0;
|
||||
for (int32_t i = 0; i < numOfLevels; i++) {
|
||||
SArray *pLevel = taosArrayGetP(pTaskList, i);
|
||||
count += taosArrayGetSize(pLevel);
|
||||
}
|
||||
|
||||
return count;
|
||||
}
|
||||
|
||||
static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) {
|
||||
SMnode *pMnode = pReq->info.node;
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
|
@ -1590,7 +1538,7 @@ static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock
|
|||
// lock
|
||||
taosRLockLatch(&pStream->lock);
|
||||
|
||||
int32_t count = getNumOfTasks(pStream->tasks);
|
||||
int32_t count = mndGetNumOfStreamTasks(pStream);
|
||||
if (numOfRows + count > rowsCapacity) {
|
||||
blockDataEnsureCapacity(pBlock, numOfRows + count);
|
||||
}
|
||||
|
@ -1623,21 +1571,6 @@ static void mndCancelGetNextStreamTask(SMnode *pMnode, void *pIter) {
|
|||
sdbCancelFetch(pSdb, pIter);
|
||||
}
|
||||
|
||||
static int32_t mndPersistStreamLog(STrans *pTrans, SStreamObj *pStream, int8_t status) {
|
||||
taosWLockLatch(&pStream->lock);
|
||||
pStream->status = status;
|
||||
SSdbRaw *pCommitRaw = mndStreamActionEncode(pStream);
|
||||
|
||||
taosWUnLockLatch(&pStream->lock);
|
||||
if (pCommitRaw == NULL) return -1;
|
||||
if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
|
||||
mError("stream trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
|
||||
return -1;
|
||||
}
|
||||
(void)sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) {
|
||||
SMnode *pMnode = pReq->info.node;
|
||||
SStreamObj *pStream = NULL;
|
||||
|
@ -1652,9 +1585,10 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) {
|
|||
|
||||
if (pStream == NULL) {
|
||||
if (pauseReq.igNotExists) {
|
||||
mInfo("stream:%s, not exist, if exist is set", pauseReq.name);
|
||||
mInfo("stream:%s, not exist, not pause stream", pauseReq.name);
|
||||
return 0;
|
||||
} else {
|
||||
mError("stream:%s not exist, failed to pause stream", pauseReq.name);
|
||||
terrno = TSDB_CODE_MND_STREAM_NOT_EXIST;
|
||||
return -1;
|
||||
}
|
||||
|
@ -1683,26 +1617,17 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) {
|
|||
return -1;
|
||||
}
|
||||
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pReq, MND_STREAM_PAUSE_NAME);
|
||||
STrans* pTrans = doCreateTrans(pMnode, pStream, pReq, MND_STREAM_PAUSE_NAME, "pause the stream");
|
||||
if (pTrans == NULL) {
|
||||
mError("stream:%s failed to pause stream since %s", pauseReq.name, terrstr());
|
||||
sdbRelease(pMnode->pSdb, pStream);
|
||||
return -1;
|
||||
}
|
||||
|
||||
mInfo("trans:%d, used to pause stream:%s", pTrans->id, pauseReq.name);
|
||||
|
||||
mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetSTbName);
|
||||
if (mndTransCheckConflict(pMnode, pTrans) != 0) {
|
||||
sdbRelease(pMnode->pSdb, pStream);
|
||||
mndTransDrop(pTrans);
|
||||
return -1;
|
||||
}
|
||||
|
||||
int32_t code = mndStreamRegisterTrans(pTrans, MND_STREAM_PAUSE_NAME, pStream->uid);
|
||||
|
||||
// if nodeUpdate happened, not send pause trans
|
||||
if (mndPauseStreamTasks(pMnode, pTrans, pStream) < 0) {
|
||||
if (mndStreamSetPauseAction(pMnode, pTrans, pStream) < 0) {
|
||||
mError("stream:%s, failed to pause task since %s", pauseReq.name, terrstr());
|
||||
sdbRelease(pMnode->pSdb, pStream);
|
||||
mndTransDrop(pTrans);
|
||||
|
@ -1710,12 +1635,18 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) {
|
|||
}
|
||||
|
||||
// pause stream
|
||||
if (mndPersistStreamLog(pTrans, pStream, STREAM_STATUS__PAUSE) < 0) {
|
||||
taosWLockLatch(&pStream->lock);
|
||||
pStream->status = STREAM_STATUS__PAUSE;
|
||||
if (mndPersistTransLog(pStream, pTrans,SDB_STATUS_READY) < 0) {
|
||||
taosWUnLockLatch(&pStream->lock);
|
||||
|
||||
sdbRelease(pMnode->pSdb, pStream);
|
||||
mndTransDrop(pTrans);
|
||||
return -1;
|
||||
}
|
||||
|
||||
taosWUnLockLatch(&pStream->lock);
|
||||
|
||||
if (mndTransPrepare(pMnode, pTrans) != 0) {
|
||||
mError("trans:%d, failed to prepare pause stream trans since %s", pTrans->id, terrstr());
|
||||
sdbRelease(pMnode->pSdb, pStream);
|
||||
|
@ -1743,10 +1674,11 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) {
|
|||
|
||||
if (pStream == NULL) {
|
||||
if (pauseReq.igNotExists) {
|
||||
mInfo("stream:%s, not exist, if exist is set", pauseReq.name);
|
||||
mInfo("stream:%s not exist, not resume stream", pauseReq.name);
|
||||
sdbRelease(pMnode->pSdb, pStream);
|
||||
return 0;
|
||||
} else {
|
||||
mError("stream:%s not exist, failed to resume stream", pauseReq.name);
|
||||
terrno = TSDB_CODE_MND_STREAM_NOT_EXIST;
|
||||
return -1;
|
||||
}
|
||||
|
@ -1769,26 +1701,17 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) {
|
|||
return -1;
|
||||
}
|
||||
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pReq, MND_STREAM_RESUME_NAME);
|
||||
STrans* pTrans = doCreateTrans(pMnode, pStream, pReq, MND_STREAM_RESUME_NAME, "resume the stream");
|
||||
if (pTrans == NULL) {
|
||||
mError("stream:%s, failed to resume stream since %s", pauseReq.name, terrstr());
|
||||
sdbRelease(pMnode->pSdb, pStream);
|
||||
return -1;
|
||||
}
|
||||
|
||||
mInfo("trans:%d used to resume stream:%s", pTrans->id, pauseReq.name);
|
||||
|
||||
mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetSTbName);
|
||||
if (mndTransCheckConflict(pMnode, pTrans) != 0) {
|
||||
sdbRelease(pMnode->pSdb, pStream);
|
||||
mndTransDrop(pTrans);
|
||||
return -1;
|
||||
}
|
||||
|
||||
int32_t code = mndStreamRegisterTrans(pTrans, MND_STREAM_RESUME_NAME, pStream->uid);
|
||||
|
||||
// resume all tasks
|
||||
if (mndResumeStreamTasks(pTrans, pMnode, pStream, pauseReq.igUntreated) < 0) {
|
||||
// set the resume action
|
||||
if (mndStreamSetResumeAction(pTrans, pMnode, pStream, pauseReq.igUntreated) < 0) {
|
||||
mError("stream:%s, failed to drop task since %s", pauseReq.name, terrstr());
|
||||
sdbRelease(pMnode->pSdb, pStream);
|
||||
mndTransDrop(pTrans);
|
||||
|
@ -1796,12 +1719,17 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) {
|
|||
}
|
||||
|
||||
// resume stream
|
||||
if (mndPersistStreamLog(pTrans, pStream, STREAM_STATUS__NORMAL) < 0) {
|
||||
taosWLockLatch(&pStream->lock);
|
||||
pStream->status = STREAM_STATUS__NORMAL;
|
||||
if (mndPersistTransLog(pStream, pTrans, SDB_STATUS_READY) < 0) {
|
||||
taosWUnLockLatch(&pStream->lock);
|
||||
|
||||
sdbRelease(pMnode->pSdb, pStream);
|
||||
mndTransDrop(pTrans);
|
||||
return -1;
|
||||
}
|
||||
|
||||
taosWUnLockLatch(&pStream->lock);
|
||||
if (mndTransPrepare(pMnode, pTrans) != 0) {
|
||||
mError("trans:%d, failed to prepare pause stream trans since %s", pTrans->id, terrstr());
|
||||
sdbRelease(pMnode->pSdb, pStream);
|
||||
|
@ -1815,91 +1743,6 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) {
|
|||
return TSDB_CODE_ACTION_IN_PROGRESS;
|
||||
}
|
||||
|
||||
static void initNodeUpdateMsg(SStreamTaskNodeUpdateMsg *pMsg, const SVgroupChangeInfo *pInfo, SStreamTaskId *pId,
|
||||
int32_t transId) {
|
||||
pMsg->streamId = pId->streamId;
|
||||
pMsg->taskId = pId->taskId;
|
||||
pMsg->transId = transId;
|
||||
pMsg->pNodeList = taosArrayInit(taosArrayGetSize(pInfo->pUpdateNodeList), sizeof(SNodeUpdateInfo));
|
||||
taosArrayAddAll(pMsg->pNodeList, pInfo->pUpdateNodeList);
|
||||
}
|
||||
|
||||
static int32_t doBuildStreamTaskUpdateMsg(void **pBuf, int32_t *pLen, SVgroupChangeInfo *pInfo, int32_t nodeId,
|
||||
SStreamTaskId *pId, int32_t transId) {
|
||||
SStreamTaskNodeUpdateMsg req = {0};
|
||||
initNodeUpdateMsg(&req, pInfo, pId, transId);
|
||||
|
||||
int32_t code = 0;
|
||||
int32_t blen;
|
||||
|
||||
tEncodeSize(tEncodeStreamTaskUpdateMsg, &req, blen, code);
|
||||
if (code < 0) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
taosArrayDestroy(req.pNodeList);
|
||||
return -1;
|
||||
}
|
||||
|
||||
int32_t tlen = sizeof(SMsgHead) + blen;
|
||||
|
||||
void *buf = taosMemoryMalloc(tlen);
|
||||
if (buf == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
taosArrayDestroy(req.pNodeList);
|
||||
return -1;
|
||||
}
|
||||
|
||||
void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
|
||||
SEncoder encoder;
|
||||
tEncoderInit(&encoder, abuf, tlen);
|
||||
tEncodeStreamTaskUpdateMsg(&encoder, &req);
|
||||
|
||||
SMsgHead *pMsgHead = (SMsgHead *)buf;
|
||||
pMsgHead->contLen = htonl(tlen);
|
||||
pMsgHead->vgId = htonl(nodeId);
|
||||
|
||||
tEncoderClear(&encoder);
|
||||
|
||||
*pBuf = buf;
|
||||
*pLen = tlen;
|
||||
|
||||
taosArrayDestroy(req.pNodeList);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
// todo extract method: traverse stream tasks
|
||||
// build trans to update the epset
|
||||
static int32_t createStreamUpdateTrans(SStreamObj *pStream, SVgroupChangeInfo *pInfo, STrans *pTrans) {
|
||||
mDebug("start to build stream:0x%" PRIx64 " tasks epset update", pStream->uid);
|
||||
|
||||
taosWLockLatch(&pStream->lock);
|
||||
int32_t numOfLevels = taosArrayGetSize(pStream->tasks);
|
||||
|
||||
for (int32_t j = 0; j < numOfLevels; ++j) {
|
||||
SArray *pLevel = taosArrayGetP(pStream->tasks, j);
|
||||
|
||||
int32_t numOfTasks = taosArrayGetSize(pLevel);
|
||||
for (int32_t k = 0; k < numOfTasks; ++k) {
|
||||
SStreamTask *pTask = taosArrayGetP(pLevel, k);
|
||||
|
||||
void *pBuf = NULL;
|
||||
int32_t len = 0;
|
||||
streamTaskUpdateEpsetInfo(pTask, pInfo->pUpdateNodeList);
|
||||
doBuildStreamTaskUpdateMsg(&pBuf, &len, pInfo, pTask->info.nodeId, &pTask->id, pTrans->id);
|
||||
|
||||
STransAction action = {0};
|
||||
initTransAction(&action, pBuf, len, TDMT_VND_STREAM_TASK_UPDATE, &pTask->info.epSet, 0);
|
||||
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
|
||||
taosMemoryFree(pBuf);
|
||||
taosWUnLockLatch(&pStream->lock);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
taosWUnLockLatch(&pStream->lock);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static bool isNodeEpsetChanged(const SEpSet *pPrevEpset, const SEpSet *pCurrent) {
|
||||
const SEp *pEp = GET_ACTIVE_EP(pPrevEpset);
|
||||
const SEp *p = GET_ACTIVE_EP(pCurrent);
|
||||
|
@ -1997,7 +1840,7 @@ static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChange
|
|||
return terrno;
|
||||
}
|
||||
|
||||
mndStreamRegisterTrans(pTrans, MND_STREAM_TASK_RESET_NAME, pStream->uid);
|
||||
mndStreamRegisterTrans(pTrans, MND_STREAM_TASK_UPDATE_NAME, pStream->uid);
|
||||
}
|
||||
|
||||
void *p = taosHashGet(pChangeInfo->pDBMap, pStream->targetDb, strlen(pStream->targetDb));
|
||||
|
@ -2011,7 +1854,7 @@ static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChange
|
|||
mDebug("stream:0x%" PRIx64 " %s involved node changed, create update trans, transId:%d", pStream->uid,
|
||||
pStream->name, pTrans->id);
|
||||
|
||||
int32_t code = createStreamUpdateTrans(pStream, pChangeInfo, pTrans);
|
||||
int32_t code = mndStreamSetUpdateEpsetAction(pStream, pChangeInfo, pTrans);
|
||||
|
||||
// todo: not continue, drop all and retry again
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
|
@ -2171,26 +2014,6 @@ int32_t removeExpirednodeEntryAndTask(SArray *pNodeSnapshot) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
// kill all trans in the dst DB
|
||||
static void killAllCheckpointTrans(SMnode *pMnode, SVgroupChangeInfo *pChangeInfo) {
|
||||
mDebug("start to clear checkpoints in all Dbs");
|
||||
|
||||
void *pIter = NULL;
|
||||
while ((pIter = taosHashIterate(pChangeInfo->pDBMap, pIter)) != NULL) {
|
||||
char *pDb = (char *)pIter;
|
||||
|
||||
size_t len = 0;
|
||||
void *pKey = taosHashGetKey(pDb, &len);
|
||||
char *p = strndup(pKey, len);
|
||||
|
||||
mDebug("clear checkpoint trans in Db:%s", p);
|
||||
doKillCheckpointTrans(pMnode, pKey, len);
|
||||
taosMemoryFree(p);
|
||||
}
|
||||
|
||||
mDebug("complete clear checkpoints in Dbs");
|
||||
}
|
||||
|
||||
// this function runs by only one thread, so it is not multi-thread safe
|
||||
static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) {
|
||||
int32_t code = 0;
|
||||
|
@ -2260,10 +2083,6 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
typedef struct SMStreamNodeCheckMsg {
|
||||
int8_t placeHolder; // // to fix windows compile error, define place holder
|
||||
} SMStreamNodeCheckMsg;
|
||||
|
||||
static int32_t mndProcessNodeCheck(SRpcMsg *pReq) {
|
||||
SMnode *pMnode = pReq->info.node;
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
|
@ -2335,26 +2154,6 @@ void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode) {
|
|||
ASSERT(taosHashGetSize(pExecNode->pTaskMap) == taosArrayGetSize(pExecNode->pTaskList));
|
||||
}
|
||||
|
||||
int32_t doKillCheckpointTrans(SMnode *pMnode, const char *pDBName, size_t len) {
|
||||
// data in the hash table will be removed automatically, no need to remove it here.
|
||||
SStreamTransInfo *pTransInfo = taosHashGet(execInfo.transMgmt.pDBTrans, pDBName, len);
|
||||
if (pTransInfo == NULL) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
// not checkpoint trans, ignore
|
||||
if (strcmp(pTransInfo->name, MND_STREAM_CHECKPOINT_NAME) != 0) {
|
||||
mDebug("not checkpoint trans, not kill it, name:%s, transId:%d", pTransInfo->name, pTransInfo->transId);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
char *pDupDBName = strndup(pDBName, len);
|
||||
mndKillTransImpl(pMnode, pTransInfo->transId, pDupDBName);
|
||||
taosMemoryFree(pDupDBName);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
void freeCheckpointCandEntry(void *param) {
|
||||
SCheckpointCandEntry *pEntry = param;
|
||||
taosMemoryFreeClear(pEntry->pName);
|
||||
|
@ -2402,8 +2201,15 @@ int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq) {
|
|||
taosThreadMutexLock(&execInfo.lock);
|
||||
|
||||
SStreamObj *pStream = mndGetStreamObj(pMnode, req.streamId);
|
||||
int32_t numOfTasks = mndGetNumOfStreamTasks(pStream);
|
||||
if (pStream == NULL) {
|
||||
mError("failed to find the stream:0x%"PRIx64" not handle the checkpoint req", req.streamId);
|
||||
terrno = TSDB_CODE_MND_STREAM_NOT_EXIST;
|
||||
taosThreadMutexUnlock(&execInfo.lock);
|
||||
|
||||
return -1;
|
||||
}
|
||||
|
||||
int32_t numOfTasks = mndGetNumOfStreamTasks(pStream);
|
||||
SArray **pReqTaskList = (SArray**)taosHashGet(execInfo.pTransferStateStreams, &req.streamId, sizeof(req.streamId));
|
||||
if (pReqTaskList == NULL) {
|
||||
SArray *pList = taosArrayInit(4, sizeof(int32_t));
|
||||
|
|
|
@ -65,6 +65,8 @@ static int32_t createStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream)
|
|||
return terrno;
|
||||
}
|
||||
|
||||
/*int32_t code = */mndStreamRegisterTrans(pTrans, MND_STREAM_TASK_RESET_NAME, pStream->uid);
|
||||
|
||||
taosWLockLatch(&pStream->lock);
|
||||
int32_t numOfLevels = taosArrayGetSize(pStream->tasks);
|
||||
|
||||
|
@ -92,19 +94,13 @@ static int32_t createStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream)
|
|||
SEpSet epset = {0};
|
||||
bool hasEpset = false;
|
||||
int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
if (code != TSDB_CODE_SUCCESS || !hasEpset) {
|
||||
taosMemoryFree(pReq);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (!hasEpset) {
|
||||
taosMemoryFree(pReq);
|
||||
continue;
|
||||
}
|
||||
|
||||
STransAction action = {0};
|
||||
initTransAction(&action, pReq, sizeof(SVResetStreamTaskReq), TDMT_VND_STREAM_TASK_RESET, &epset, 0);
|
||||
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
|
||||
code = setTransAction(pTrans, pReq, sizeof(SVResetStreamTaskReq), TDMT_VND_STREAM_TASK_RESET, &epset, 0);
|
||||
if (code != 0) {
|
||||
taosMemoryFree(pReq);
|
||||
taosWUnLockLatch(&pStream->lock);
|
||||
mndTransDrop(pTrans);
|
||||
|
|
|
@ -169,7 +169,7 @@ STrans *doCreateTrans(SMnode *pMnode, SStreamObj *pStream, SRpcMsg *pReq, const
|
|||
return NULL;
|
||||
}
|
||||
|
||||
mDebug("s-task:0x%" PRIx64 " start to build trans %s", pStream->uid, pMsg);
|
||||
mInfo("s-task:0x%" PRIx64 " start to build trans %s, transId:%d", pStream->uid, pMsg, pTrans->id);
|
||||
|
||||
mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetSTbName);
|
||||
if (mndTransCheckConflict(pMnode, pTrans) != 0) {
|
||||
|
@ -255,11 +255,132 @@ int32_t mndPersistTransLog(SStreamObj *pStream, STrans *pTrans, int32_t status)
|
|||
return 0;
|
||||
}
|
||||
|
||||
void initTransAction(STransAction *pAction, void *pCont, int32_t contLen, int32_t msgType, const SEpSet *pEpset,
|
||||
int32_t retryCode) {
|
||||
pAction->epSet = *pEpset;
|
||||
pAction->contLen = contLen;
|
||||
pAction->pCont = pCont;
|
||||
pAction->msgType = msgType;
|
||||
pAction->retryCode = retryCode;
|
||||
int32_t setTransAction(STrans *pTrans, void *pCont, int32_t contLen, int32_t msgType, const SEpSet *pEpset,
|
||||
int32_t retryCode) {
|
||||
STransAction action = {.epSet = *pEpset, .contLen = contLen, .pCont = pCont, .msgType = msgType, .retryCode = retryCode};
|
||||
return mndTransAppendRedoAction(pTrans, &action);
|
||||
}
|
||||
|
||||
int32_t doKillCheckpointTrans(SMnode *pMnode, const char *pDBName, size_t len) {
|
||||
// data in the hash table will be removed automatically, no need to remove it here.
|
||||
SStreamTransInfo *pTransInfo = taosHashGet(execInfo.transMgmt.pDBTrans, pDBName, len);
|
||||
if (pTransInfo == NULL) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
// not checkpoint trans, ignore
|
||||
if (strcmp(pTransInfo->name, MND_STREAM_CHECKPOINT_NAME) != 0) {
|
||||
mDebug("not checkpoint trans, not kill it, name:%s, transId:%d", pTransInfo->name, pTransInfo->transId);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
char *pDupDBName = strndup(pDBName, len);
|
||||
mndKillTransImpl(pMnode, pTransInfo->transId, pDupDBName);
|
||||
taosMemoryFree(pDupDBName);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
// kill all trans in the dst DB
|
||||
void killAllCheckpointTrans(SMnode *pMnode, SVgroupChangeInfo *pChangeInfo) {
|
||||
mDebug("start to clear checkpoints in all Dbs");
|
||||
|
||||
void *pIter = NULL;
|
||||
while ((pIter = taosHashIterate(pChangeInfo->pDBMap, pIter)) != NULL) {
|
||||
char *pDb = (char *)pIter;
|
||||
|
||||
size_t len = 0;
|
||||
void *pKey = taosHashGetKey(pDb, &len);
|
||||
char *p = strndup(pKey, len);
|
||||
|
||||
mDebug("clear checkpoint trans in Db:%s", p);
|
||||
doKillCheckpointTrans(pMnode, pKey, len);
|
||||
taosMemoryFree(p);
|
||||
}
|
||||
|
||||
mDebug("complete clear checkpoints in Dbs");
|
||||
}
|
||||
|
||||
static void initNodeUpdateMsg(SStreamTaskNodeUpdateMsg *pMsg, const SVgroupChangeInfo *pInfo, SStreamTaskId *pId,
|
||||
int32_t transId) {
|
||||
pMsg->streamId = pId->streamId;
|
||||
pMsg->taskId = pId->taskId;
|
||||
pMsg->transId = transId;
|
||||
pMsg->pNodeList = taosArrayInit(taosArrayGetSize(pInfo->pUpdateNodeList), sizeof(SNodeUpdateInfo));
|
||||
taosArrayAddAll(pMsg->pNodeList, pInfo->pUpdateNodeList);
|
||||
}
|
||||
|
||||
static int32_t doBuildStreamTaskUpdateMsg(void **pBuf, int32_t *pLen, SVgroupChangeInfo *pInfo, int32_t nodeId,
|
||||
SStreamTaskId *pId, int32_t transId) {
|
||||
SStreamTaskNodeUpdateMsg req = {0};
|
||||
initNodeUpdateMsg(&req, pInfo, pId, transId);
|
||||
|
||||
int32_t code = 0;
|
||||
int32_t blen;
|
||||
|
||||
tEncodeSize(tEncodeStreamTaskUpdateMsg, &req, blen, code);
|
||||
if (code < 0) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
taosArrayDestroy(req.pNodeList);
|
||||
return -1;
|
||||
}
|
||||
|
||||
int32_t tlen = sizeof(SMsgHead) + blen;
|
||||
|
||||
void *buf = taosMemoryMalloc(tlen);
|
||||
if (buf == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
taosArrayDestroy(req.pNodeList);
|
||||
return -1;
|
||||
}
|
||||
|
||||
void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
|
||||
SEncoder encoder;
|
||||
tEncoderInit(&encoder, abuf, tlen);
|
||||
tEncodeStreamTaskUpdateMsg(&encoder, &req);
|
||||
|
||||
SMsgHead *pMsgHead = (SMsgHead *)buf;
|
||||
pMsgHead->contLen = htonl(tlen);
|
||||
pMsgHead->vgId = htonl(nodeId);
|
||||
|
||||
tEncoderClear(&encoder);
|
||||
|
||||
*pBuf = buf;
|
||||
*pLen = tlen;
|
||||
|
||||
taosArrayDestroy(req.pNodeList);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
// todo extract method: traverse stream tasks
|
||||
// build trans to update the epset
|
||||
int32_t mndStreamSetUpdateEpsetAction(SStreamObj *pStream, SVgroupChangeInfo *pInfo, STrans *pTrans) {
|
||||
mDebug("stream:0x%" PRIx64 " set tasks epset update action", pStream->uid);
|
||||
|
||||
taosWLockLatch(&pStream->lock);
|
||||
int32_t numOfLevels = taosArrayGetSize(pStream->tasks);
|
||||
|
||||
for (int32_t j = 0; j < numOfLevels; ++j) {
|
||||
SArray *pLevel = taosArrayGetP(pStream->tasks, j);
|
||||
|
||||
int32_t numOfTasks = taosArrayGetSize(pLevel);
|
||||
for (int32_t k = 0; k < numOfTasks; ++k) {
|
||||
SStreamTask *pTask = taosArrayGetP(pLevel, k);
|
||||
|
||||
void *pBuf = NULL;
|
||||
int32_t len = 0;
|
||||
streamTaskUpdateEpsetInfo(pTask, pInfo->pUpdateNodeList);
|
||||
doBuildStreamTaskUpdateMsg(&pBuf, &len, pInfo, pTask->info.nodeId, &pTask->id, pTrans->id);
|
||||
|
||||
int32_t code = setTransAction(pTrans, pBuf, len, TDMT_VND_STREAM_TASK_UPDATE, &pTask->info.epSet, 0);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
taosMemoryFree(pBuf);
|
||||
taosWUnLockLatch(&pStream->lock);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
taosWUnLockLatch(&pStream->lock);
|
||||
return 0;
|
||||
}
|
|
@ -143,7 +143,7 @@ int32_t extractNodeEpset(SMnode *pMnode, SEpSet *pEpSet, bool *hasEpset, int32_t
|
|||
}
|
||||
}
|
||||
|
||||
static int32_t doResumeStreamTask(STrans *pTrans, SMnode *pMnode, SStreamTask *pTask, int8_t igUntreated) {
|
||||
static int32_t doSetResumeAction(STrans *pTrans, SMnode *pMnode, SStreamTask *pTask, int8_t igUntreated) {
|
||||
SVResumeStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVResumeStreamTaskReq));
|
||||
if (pReq == NULL) {
|
||||
mError("failed to malloc in resume stream, size:%" PRIzu ", code:%s", sizeof(SVResumeStreamTaskReq),
|
||||
|
@ -160,15 +160,14 @@ static int32_t doResumeStreamTask(STrans *pTrans, SMnode *pMnode, SStreamTask *p
|
|||
SEpSet epset = {0};
|
||||
bool hasEpset = false;
|
||||
int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
if (code != TSDB_CODE_SUCCESS || (!hasEpset)) {
|
||||
terrno = code;
|
||||
taosMemoryFree(pReq);
|
||||
return -1;
|
||||
}
|
||||
|
||||
STransAction action = {0};
|
||||
initTransAction(&action, pReq, sizeof(SVResumeStreamTaskReq), TDMT_STREAM_TASK_RESUME, &epset, 0);
|
||||
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
|
||||
code = setTransAction(pTrans, pReq, sizeof(SVResumeStreamTaskReq), TDMT_STREAM_TASK_RESUME, &epset, 0);
|
||||
if (code != 0) {
|
||||
taosMemoryFree(pReq);
|
||||
return -1;
|
||||
}
|
||||
|
@ -201,14 +200,14 @@ int32_t mndGetNumOfStreamTasks(const SStreamObj *pStream) {
|
|||
return num;
|
||||
}
|
||||
|
||||
int32_t mndResumeStreamTasks(STrans *pTrans, SMnode *pMnode, SStreamObj *pStream, int8_t igUntreated) {
|
||||
int32_t mndStreamSetResumeAction(STrans *pTrans, SMnode *pMnode, SStreamObj *pStream, int8_t igUntreated) {
|
||||
int32_t size = taosArrayGetSize(pStream->tasks);
|
||||
for (int32_t i = 0; i < size; i++) {
|
||||
SArray *pTasks = taosArrayGetP(pStream->tasks, i);
|
||||
int32_t sz = taosArrayGetSize(pTasks);
|
||||
for (int32_t j = 0; j < sz; j++) {
|
||||
SStreamTask *pTask = taosArrayGetP(pTasks, j);
|
||||
if (doResumeStreamTask(pTrans, pMnode, pTask, igUntreated) < 0) {
|
||||
if (doSetResumeAction(pTrans, pMnode, pTask, igUntreated) < 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
@ -220,7 +219,7 @@ int32_t mndResumeStreamTasks(STrans *pTrans, SMnode *pMnode, SStreamObj *pStream
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int32_t doPauseStreamTask(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask) {
|
||||
static int32_t doSetPauseAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask) {
|
||||
SVPauseStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVPauseStreamTaskReq));
|
||||
if (pReq == NULL) {
|
||||
mError("failed to malloc in pause stream, size:%" PRIzu ", code:%s", sizeof(SVPauseStreamTaskReq),
|
||||
|
@ -233,32 +232,25 @@ static int32_t doPauseStreamTask(SMnode *pMnode, STrans *pTrans, SStreamTask *pT
|
|||
pReq->taskId = pTask->id.taskId;
|
||||
pReq->streamId = pTask->id.streamId;
|
||||
|
||||
SEpSet epset = {0};
|
||||
mDebug("pause node:%d, epset:%d", pTask->info.nodeId, epset.numOfEps);
|
||||
SEpSet epset = {0};
|
||||
bool hasEpset = false;
|
||||
int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
if (code != TSDB_CODE_SUCCESS || !hasEpset) {
|
||||
terrno = code;
|
||||
taosMemoryFree(pReq);
|
||||
return -1;
|
||||
return code;
|
||||
}
|
||||
|
||||
// no valid epset, return directly without redoAction
|
||||
if (!hasEpset) {
|
||||
taosMemoryFree(pReq);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
STransAction action = {0};
|
||||
initTransAction(&action, pReq, sizeof(SVPauseStreamTaskReq), TDMT_STREAM_TASK_PAUSE, &epset, 0);
|
||||
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
|
||||
mDebug("pause node:%d, epset:%d", pTask->info.nodeId, epset.numOfEps);
|
||||
code = setTransAction(pTrans, pReq, sizeof(SVPauseStreamTaskReq), TDMT_STREAM_TASK_PAUSE, &epset, 0);
|
||||
if (code != 0) {
|
||||
taosMemoryFree(pReq);
|
||||
return -1;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t mndPauseStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) {
|
||||
int32_t mndStreamSetPauseAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) {
|
||||
SArray *tasks = pStream->tasks;
|
||||
|
||||
int32_t size = taosArrayGetSize(tasks);
|
||||
|
@ -267,7 +259,8 @@ int32_t mndPauseStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream)
|
|||
int32_t sz = taosArrayGetSize(pTasks);
|
||||
for (int32_t j = 0; j < sz; j++) {
|
||||
SStreamTask *pTask = taosArrayGetP(pTasks, j);
|
||||
if (doPauseStreamTask(pMnode, pTrans, pTask) < 0) {
|
||||
|
||||
if (doSetPauseAction(pMnode, pTrans, pTask) < 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
|
|
@ -171,10 +171,6 @@ int32_t sndProcessStreamMsg(SSnode *pSnode, SRpcMsg *pMsg) {
|
|||
return tqStreamTaskProcessRetrieveReq(pSnode->pMeta, pMsg);
|
||||
case TDMT_STREAM_RETRIEVE_RSP: // 1036
|
||||
break;
|
||||
case TDMT_VND_STREAM_SCAN_HISTORY_FINISH:
|
||||
return tqStreamTaskProcessScanHistoryFinishReq(pSnode->pMeta, pMsg);
|
||||
case TDMT_VND_STREAM_SCAN_HISTORY_FINISH_RSP:
|
||||
return tqStreamTaskProcessScanHistoryFinishRsp(pSnode->pMeta, pMsg);
|
||||
case TDMT_VND_STREAM_TASK_CHECK:
|
||||
return tqStreamTaskProcessCheckReq(pSnode->pMeta, pMsg);
|
||||
case TDMT_VND_STREAM_TASK_CHECK_RSP:
|
||||
|
|
|
@ -267,8 +267,6 @@ int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg);
|
|||
int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg);
|
||||
int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg);
|
||||
int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg);
|
||||
int32_t tqProcessTaskScanHistoryFinishReq(STQ* pTq, SRpcMsg* pMsg);
|
||||
int32_t tqProcessTaskScanHistoryFinishRsp(STQ* pTq, SRpcMsg* pMsg);
|
||||
|
||||
// sma
|
||||
int32_t smaInit();
|
||||
|
|
|
@ -1043,15 +1043,6 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
|
|||
return code;
|
||||
}
|
||||
|
||||
// only the agg tasks and the sink tasks will receive this message from upstream tasks
|
||||
int32_t tqProcessTaskScanHistoryFinishReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||
return tqStreamTaskProcessScanHistoryFinishReq(pTq->pStreamMeta, pMsg);
|
||||
}
|
||||
|
||||
int32_t tqProcessTaskScanHistoryFinishRsp(STQ* pTq, SRpcMsg* pMsg) {
|
||||
return tqStreamTaskProcessScanHistoryFinishRsp(pTq->pStreamMeta, pMsg);
|
||||
}
|
||||
|
||||
int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||
SStreamTaskRunReq* pReq = pMsg->pCont;
|
||||
|
||||
|
|
|
@ -23,7 +23,7 @@ static int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle);
|
|||
static int32_t setWalReaderStartOffset(SStreamTask* pTask, int32_t vgId);
|
||||
static bool handleFillhistoryScanComplete(SStreamTask* pTask, int64_t ver);
|
||||
static bool taskReadyForDataFromWal(SStreamTask* pTask);
|
||||
static bool doPutDataIntoInputQFromWal(SStreamTask* pTask, int64_t maxVer, int32_t* numOfItems);
|
||||
static bool doPutDataIntoInputQ(SStreamTask* pTask, int64_t maxVer, int32_t* numOfItems);
|
||||
static int32_t tqScanWalInFuture(STQ* pTq, int32_t numOfTasks, int32_t idleDuration);
|
||||
|
||||
// extract data blocks(submit/delete) from WAL, and add them into the input queue for all the sources tasks.
|
||||
|
@ -300,21 +300,21 @@ bool taskReadyForDataFromWal(SStreamTask* pTask) {
|
|||
return true;
|
||||
}
|
||||
|
||||
bool doPutDataIntoInputQFromWal(SStreamTask* pTask, int64_t maxVer, int32_t* numOfItems) {
|
||||
bool doPutDataIntoInputQ(SStreamTask* pTask, int64_t maxVer, int32_t* numOfItems) {
|
||||
const char* id = pTask->id.idStr;
|
||||
int32_t numOfNewItems = 0;
|
||||
|
||||
while(1) {
|
||||
while (1) {
|
||||
if ((pTask->info.fillHistory == 1) && pTask->status.appendTranstateBlock) {
|
||||
*numOfItems += numOfNewItems;
|
||||
return numOfNewItems > 0;
|
||||
}
|
||||
|
||||
SStreamQueueItem* pItem = NULL;
|
||||
int32_t code = extractMsgFromWal(pTask->exec.pWalReader, (void**)&pItem, maxVer, id);
|
||||
int32_t code = extractMsgFromWal(pTask->exec.pWalReader, (void**)&pItem, maxVer, id);
|
||||
if (code != TSDB_CODE_SUCCESS || pItem == NULL) { // failed, continue
|
||||
int64_t currentVer = walReaderGetCurrentVer(pTask->exec.pWalReader);
|
||||
bool itemInFillhistory = handleFillhistoryScanComplete(pTask, currentVer);
|
||||
bool itemInFillhistory = handleFillhistoryScanComplete(pTask, currentVer);
|
||||
if (itemInFillhistory) {
|
||||
numOfNewItems += 1;
|
||||
}
|
||||
|
@ -334,7 +334,9 @@ bool doPutDataIntoInputQFromWal(SStreamTask* pTask, int64_t maxVer, int32_t* num
|
|||
break;
|
||||
}
|
||||
} else {
|
||||
tqError("s-task:%s append input queue failed, code: too many items, ver:%" PRId64, id, pTask->chkInfo.nextProcessVer);
|
||||
walReaderSeekVer(pTask->exec.pWalReader, pTask->chkInfo.nextProcessVer);
|
||||
tqError("s-task:%s append input queue failed, code:too many items, ver:%" PRId64, id,
|
||||
pTask->chkInfo.nextProcessVer);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
@ -399,7 +401,7 @@ int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle) {
|
|||
continue;
|
||||
}
|
||||
|
||||
bool hasNewData = doPutDataIntoInputQFromWal(pTask, maxVer, &numOfItems);
|
||||
bool hasNewData = doPutDataIntoInputQ(pTask, maxVer, &numOfItems);
|
||||
taosThreadMutexUnlock(&pTask->lock);
|
||||
|
||||
if ((numOfItems > 0) || hasNewData) {
|
||||
|
|
|
@ -328,74 +328,6 @@ int32_t tqStreamTaskProcessRetrieveReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t tqStreamTaskProcessScanHistoryFinishReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
|
||||
char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
|
||||
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
|
||||
|
||||
// deserialize
|
||||
SStreamScanHistoryFinishReq req = {0};
|
||||
|
||||
SDecoder decoder;
|
||||
tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
|
||||
tDecodeStreamScanHistoryFinishReq(&decoder, &req);
|
||||
tDecoderClear(&decoder);
|
||||
|
||||
SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.streamId, req.downstreamTaskId);
|
||||
if (pTask == NULL) {
|
||||
tqError("vgId:%d process scan history finish msg, failed to find task:0x%x, it may be destroyed", pMeta->vgId,
|
||||
req.downstreamTaskId);
|
||||
return -1;
|
||||
}
|
||||
|
||||
tqDebug("s-task:%s receive scan-history finish msg from task:0x%x", pTask->id.idStr, req.upstreamTaskId);
|
||||
|
||||
int32_t code = streamProcessScanHistoryFinishReq(pTask, &req, &pMsg->info);
|
||||
streamMetaReleaseTask(pMeta, pTask);
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t tqStreamTaskProcessScanHistoryFinishRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
|
||||
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
|
||||
|
||||
// deserialize
|
||||
SStreamCompleteHistoryMsg req = {0};
|
||||
|
||||
SDecoder decoder;
|
||||
tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
|
||||
tDecodeCompleteHistoryDataMsg(&decoder, &req);
|
||||
tDecoderClear(&decoder);
|
||||
|
||||
if (pMeta->role == NODE_ROLE_FOLLOWER) {
|
||||
tqError("s-task:0x%x (vgId:%d) not handle the scan-history finish rsp, since it becomes follower",
|
||||
req.upstreamTaskId, pMeta->vgId);
|
||||
return TASK_DOWNSTREAM_NOT_LEADER;
|
||||
}
|
||||
|
||||
SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.streamId, req.upstreamTaskId);
|
||||
if (pTask == NULL) {
|
||||
tqError("vgId:%d process scan history finish rsp, failed to find task:0x%x, it may be destroyed", pMeta->vgId,
|
||||
req.upstreamTaskId);
|
||||
return -1;
|
||||
}
|
||||
|
||||
int32_t remain = atomic_sub_fetch_32(&pTask->notReadyTasks, 1);
|
||||
if (remain > 0) {
|
||||
tqDebug("s-task:%s scan-history finish rsp received from downstream task:0x%x, unfinished remain:%d",
|
||||
pTask->id.idStr, req.downstreamId, remain);
|
||||
} else {
|
||||
tqDebug(
|
||||
"s-task:%s scan-history finish rsp received from downstream task:0x%x, all downstream tasks rsp scan-history "
|
||||
"completed msg",
|
||||
pTask->id.idStr, req.downstreamId);
|
||||
code = streamProcessScanHistoryFinishRsp(pTask);
|
||||
}
|
||||
|
||||
streamMetaReleaseTask(pMeta, pTask);
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t tqStreamTaskProcessCheckReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
|
||||
char* msgStr = pMsg->pCont;
|
||||
char* msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
|
||||
|
|
|
@ -789,10 +789,6 @@ int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo)
|
|||
return tqProcessTaskRetrieveRsp(pVnode->pTq, pMsg);
|
||||
case TDMT_VND_STREAM_SCAN_HISTORY:
|
||||
return tqProcessTaskScanHistory(pVnode->pTq, pMsg);
|
||||
case TDMT_VND_STREAM_SCAN_HISTORY_FINISH:
|
||||
return tqProcessTaskScanHistoryFinishReq(pVnode->pTq, pMsg);
|
||||
case TDMT_VND_STREAM_SCAN_HISTORY_FINISH_RSP:
|
||||
return tqProcessTaskScanHistoryFinishRsp(pVnode->pTq, pMsg);
|
||||
case TDMT_STREAM_TASK_CHECKPOINT_READY:
|
||||
return tqProcessTaskCheckpointReadyMsg(pVnode->pTq, pMsg);
|
||||
default:
|
||||
|
|
|
@ -1027,57 +1027,6 @@ int32_t qSetStreamOperatorOptionForScanHistory(qTaskInfo_t tinfo) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t qResetStreamOperatorOptionForScanHistory(qTaskInfo_t tinfo) {
|
||||
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
|
||||
SOperatorInfo* pOperator = pTaskInfo->pRoot;
|
||||
|
||||
while (1) {
|
||||
int32_t type = pOperator->operatorType;
|
||||
if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL || type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL ||
|
||||
type == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL) {
|
||||
SStreamIntervalOperatorInfo* pInfo = pOperator->info;
|
||||
STimeWindowAggSupp* pSup = &pInfo->twAggSup;
|
||||
|
||||
pSup->calTriggerSaved = 0;
|
||||
pSup->deleteMarkSaved = 0;
|
||||
qInfo("reset stream param for interval: %d, %" PRId64, pSup->calTrigger, pSup->deleteMark);
|
||||
|
||||
} else if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION ||
|
||||
type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION ||
|
||||
type == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) {
|
||||
SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
|
||||
STimeWindowAggSupp* pSup = &pInfo->twAggSup;
|
||||
|
||||
pSup->calTriggerSaved = 0;
|
||||
pSup->deleteMarkSaved = 0;
|
||||
qInfo("reset stream param for session: %d, %" PRId64, pSup->calTrigger, pSup->deleteMark);
|
||||
|
||||
} else if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE) {
|
||||
SStreamStateAggOperatorInfo* pInfo = pOperator->info;
|
||||
STimeWindowAggSupp* pSup = &pInfo->twAggSup;
|
||||
|
||||
pSup->calTriggerSaved = 0;
|
||||
pSup->deleteMarkSaved = 0;
|
||||
qInfo("reset stream param for state: %d, %" PRId64, pSup->calTrigger, pSup->deleteMark);
|
||||
|
||||
} else if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_EVENT) {
|
||||
SStreamEventAggOperatorInfo* pInfo = pOperator->info;
|
||||
STimeWindowAggSupp* pSup = &pInfo->twAggSup;
|
||||
|
||||
pSup->calTriggerSaved = 0;
|
||||
pSup->deleteMarkSaved = 0;
|
||||
qInfo("save stream param for state: %d, %" PRId64, pSup->calTrigger, pSup->deleteMark);
|
||||
}
|
||||
|
||||
// iterate operator tree
|
||||
if (pOperator->numOfDownstream != 1 || pOperator->pDownstream[0] == NULL) {
|
||||
return 0;
|
||||
} else {
|
||||
pOperator = pOperator->pDownstream[0];
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
int32_t qRestoreStreamOperatorOption(qTaskInfo_t tinfo) {
|
||||
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
|
||||
const char* id = GET_TASKID(pTaskInfo);
|
||||
|
|
|
@ -123,8 +123,6 @@ int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t numCap, int32_t
|
|||
STaskId streamTaskGetTaskId(const SStreamTask* pTask);
|
||||
void streamTaskInitForLaunchHTask(SHistoryTaskInfo* pInfo);
|
||||
void streamTaskSetRetryInfoForLaunch(SHistoryTaskInfo* pInfo);
|
||||
int32_t streamTaskBuildScanhistoryRspMsg(SStreamTask* pTask, SStreamScanHistoryFinishReq* pReq, void** pBuffer,
|
||||
int32_t* pLen);
|
||||
int32_t streamTaskFillHistoryFinished(SStreamTask* pTask);
|
||||
|
||||
void streamClearChkptReadyMsg(SStreamTask* pTask);
|
||||
|
@ -134,10 +132,7 @@ int32_t streamQueueItemGetSize(const SStreamQueueItem* pItem);
|
|||
void streamQueueItemIncSize(const SStreamQueueItem* pItem, int32_t size);
|
||||
const char* streamQueueItemGetTypeStr(int32_t type);
|
||||
SStreamQueueItem* streamQueueMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* pElem);
|
||||
|
||||
int32_t streamAddEndScanHistoryMsg(SStreamTask* pTask, SRpcHandleInfo* pRpcInfo, SStreamScanHistoryFinishReq* pReq);
|
||||
int32_t streamNotifyUpstreamContinue(SStreamTask* pTask);
|
||||
int32_t streamTransferStateToStreamTask(SStreamTask* pTask);
|
||||
int32_t streamTransferStateToStreamTask(SStreamTask* pTask);
|
||||
|
||||
SStreamQueue* streamQueueOpen(int64_t cap);
|
||||
void streamQueueClose(SStreamQueue* pQueue, int32_t taskId);
|
||||
|
|
|
@ -34,9 +34,6 @@ static int32_t doSendDispatchMsg(SStreamTask* pTask, const SStreamDispatchReq* p
|
|||
static int32_t streamAddBlockIntoDispatchMsg(const SSDataBlock* pBlock, SStreamDispatchReq* pReq);
|
||||
static int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, SSDataBlock* pDataBlock,
|
||||
int32_t vgSz, int64_t groupId);
|
||||
static int32_t doDispatchScanHistoryFinishMsg(SStreamTask* pTask, const SStreamScanHistoryFinishReq* pReq, int32_t vgId,
|
||||
SEpSet* pEpSet);
|
||||
|
||||
static int32_t tInitStreamDispatchReq(SStreamDispatchReq* pReq, const SStreamTask* pTask, int32_t vgId,
|
||||
int32_t numOfBlocks, int64_t dstTaskId, int32_t type);
|
||||
|
||||
|
@ -676,41 +673,6 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask) {
|
||||
SStreamScanHistoryFinishReq req = {
|
||||
.streamId = pTask->id.streamId,
|
||||
.childId = pTask->info.selfChildId,
|
||||
.upstreamTaskId = pTask->id.taskId,
|
||||
.upstreamNodeId = pTask->pMeta->vgId,
|
||||
};
|
||||
|
||||
// serialize
|
||||
if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
|
||||
req.downstreamTaskId = pTask->outputInfo.fixedDispatcher.taskId;
|
||||
pTask->notReadyTasks = 1;
|
||||
doDispatchScanHistoryFinishMsg(pTask, &req, pTask->outputInfo.fixedDispatcher.nodeId,
|
||||
&pTask->outputInfo.fixedDispatcher.epSet);
|
||||
} else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
||||
SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos;
|
||||
int32_t numOfVgs = taosArrayGetSize(vgInfo);
|
||||
pTask->notReadyTasks = numOfVgs;
|
||||
|
||||
SStreamTaskState* pState = streamTaskGetStatus(pTask);
|
||||
stDebug("s-task:%s send scan-history data complete msg to downstream (shuffle-dispatch) %d tasks, status:%s",
|
||||
pTask->id.idStr, numOfVgs, pState->name);
|
||||
for (int32_t i = 0; i < numOfVgs; i++) {
|
||||
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
|
||||
req.downstreamTaskId = pVgInfo->taskId;
|
||||
doDispatchScanHistoryFinishMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet);
|
||||
}
|
||||
} else {
|
||||
stDebug("s-task:%s no downstream tasks, invoke scan-history finish rsp directly", pTask->id.idStr);
|
||||
streamProcessScanHistoryFinishRsp(pTask);
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
// this function is usually invoked by sink/agg task
|
||||
int32_t streamTaskSendCheckpointReadyMsg(SStreamTask* pTask) {
|
||||
int32_t num = taosArrayGetSize(pTask->pReadyMsgList);
|
||||
|
@ -782,48 +744,6 @@ int32_t streamAddBlockIntoDispatchMsg(const SSDataBlock* pBlock, SStreamDispatch
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t doDispatchScanHistoryFinishMsg(SStreamTask* pTask, const SStreamScanHistoryFinishReq* pReq, int32_t vgId,
|
||||
SEpSet* pEpSet) {
|
||||
void* buf = NULL;
|
||||
int32_t code = -1;
|
||||
SRpcMsg msg = {0};
|
||||
|
||||
int32_t tlen;
|
||||
tEncodeSize(tEncodeStreamScanHistoryFinishReq, pReq, tlen, code);
|
||||
if (code < 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
buf = rpcMallocCont(sizeof(SMsgHead) + tlen);
|
||||
if (buf == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
|
||||
((SMsgHead*)buf)->vgId = htonl(vgId);
|
||||
void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
|
||||
|
||||
SEncoder encoder;
|
||||
tEncoderInit(&encoder, abuf, tlen);
|
||||
if ((code = tEncodeStreamScanHistoryFinishReq(&encoder, pReq)) < 0) {
|
||||
if (buf) {
|
||||
rpcFreeCont(buf);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
tEncoderClear(&encoder);
|
||||
|
||||
initRpcMsg(&msg, TDMT_VND_STREAM_SCAN_HISTORY_FINISH, buf, tlen + sizeof(SMsgHead));
|
||||
|
||||
tmsgSendReq(pEpSet, &msg);
|
||||
|
||||
SStreamTaskState* pState = streamTaskGetStatus(pTask);
|
||||
stDebug("s-task:%s status:%s dispatch scan-history finish msg to taskId:0x%x (vgId:%d)", pTask->id.idStr, pState->name,
|
||||
pReq->downstreamTaskId, vgId);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t doSendDispatchMsg(SStreamTask* pTask, const SStreamDispatchReq* pReq, int32_t vgId, SEpSet* pEpSet) {
|
||||
void* buf = NULL;
|
||||
int32_t code = -1;
|
||||
|
@ -989,109 +909,6 @@ void streamClearChkptReadyMsg(SStreamTask* pTask) {
|
|||
taosArrayClear(pTask->pReadyMsgList);
|
||||
}
|
||||
|
||||
int32_t tEncodeCompleteHistoryDataMsg(SEncoder* pEncoder, const SStreamCompleteHistoryMsg* pReq) {
|
||||
if (tStartEncode(pEncoder) < 0) return -1;
|
||||
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
|
||||
if (tEncodeI32(pEncoder, pReq->downstreamId) < 0) return -1;
|
||||
if (tEncodeI32(pEncoder, pReq->downstreamNode) < 0) return -1;
|
||||
if (tEncodeI32(pEncoder, pReq->upstreamTaskId) < 0) return -1;
|
||||
if (tEncodeI32(pEncoder, pReq->upstreamNodeId) < 0) return -1;
|
||||
tEndEncode(pEncoder);
|
||||
return pEncoder->pos;
|
||||
}
|
||||
|
||||
int32_t tDecodeCompleteHistoryDataMsg(SDecoder* pDecoder, SStreamCompleteHistoryMsg* pRsp) {
|
||||
if (tStartDecode(pDecoder) < 0) return -1;
|
||||
if (tDecodeI64(pDecoder, &pRsp->streamId) < 0) return -1;
|
||||
if (tDecodeI32(pDecoder, &pRsp->downstreamId) < 0) return -1;
|
||||
if (tDecodeI32(pDecoder, &pRsp->downstreamNode) < 0) return -1;
|
||||
if (tDecodeI32(pDecoder, &pRsp->upstreamTaskId) < 0) return -1;
|
||||
if (tDecodeI32(pDecoder, &pRsp->upstreamNodeId) < 0) return -1;
|
||||
tEndDecode(pDecoder);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t streamTaskBuildScanhistoryRspMsg(SStreamTask* pTask, SStreamScanHistoryFinishReq* pReq, void** pBuffer,
|
||||
int32_t* pLen) {
|
||||
int32_t len = 0;
|
||||
int32_t code = 0;
|
||||
SEncoder encoder;
|
||||
|
||||
SStreamCompleteHistoryMsg msg = {
|
||||
.streamId = pReq->streamId,
|
||||
.upstreamTaskId = pReq->upstreamTaskId,
|
||||
.upstreamNodeId = pReq->upstreamNodeId,
|
||||
.downstreamId = pReq->downstreamTaskId,
|
||||
.downstreamNode = pTask->pMeta->vgId,
|
||||
};
|
||||
|
||||
tEncodeSize(tEncodeCompleteHistoryDataMsg, &msg, len, code);
|
||||
if (code < 0) {
|
||||
return code;
|
||||
}
|
||||
|
||||
void* pBuf = rpcMallocCont(sizeof(SMsgHead) + len);
|
||||
if (pBuf == NULL) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
((SMsgHead*)pBuf)->vgId = htonl(pReq->upstreamNodeId);
|
||||
|
||||
void* abuf = POINTER_SHIFT(pBuf, sizeof(SMsgHead));
|
||||
|
||||
tEncoderInit(&encoder, (uint8_t*)abuf, len);
|
||||
tEncodeCompleteHistoryDataMsg(&encoder, &msg);
|
||||
tEncoderClear(&encoder);
|
||||
|
||||
*pBuffer = pBuf;
|
||||
*pLen = len;
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t streamAddEndScanHistoryMsg(SStreamTask* pTask, SRpcHandleInfo* pRpcInfo, SStreamScanHistoryFinishReq* pReq) {
|
||||
void* pBuf = NULL;
|
||||
int32_t len = 0;
|
||||
|
||||
streamTaskBuildScanhistoryRspMsg(pTask, pReq, &pBuf, &len);
|
||||
SStreamChildEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, pReq->upstreamTaskId);
|
||||
|
||||
SStreamContinueExecInfo info = {.taskId = pReq->upstreamTaskId, .epset = pInfo->epSet};
|
||||
initRpcMsg(&info.msg, 0, pBuf, sizeof(SMsgHead) + len);
|
||||
info.msg.info = *pRpcInfo;
|
||||
|
||||
taosThreadMutexLock(&pTask->lock);
|
||||
|
||||
if (pTask->pRspMsgList == NULL) {
|
||||
pTask->pRspMsgList = taosArrayInit(4, sizeof(SStreamContinueExecInfo));
|
||||
}
|
||||
taosArrayPush(pTask->pRspMsgList, &info);
|
||||
taosThreadMutexUnlock(&pTask->lock);
|
||||
|
||||
int32_t num = taosArrayGetSize(pTask->pRspMsgList);
|
||||
stDebug("s-task:%s add scan-history finish rsp msg for task:0x%x, total:%d", pTask->id.idStr, pReq->upstreamTaskId,
|
||||
num);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t streamNotifyUpstreamContinue(SStreamTask* pTask) {
|
||||
ASSERT(pTask->info.taskLevel == TASK_LEVEL__AGG || pTask->info.taskLevel == TASK_LEVEL__SINK);
|
||||
|
||||
const char* id = pTask->id.idStr;
|
||||
int32_t level = pTask->info.taskLevel;
|
||||
|
||||
int32_t num = taosArrayGetSize(pTask->pRspMsgList);
|
||||
for (int32_t i = 0; i < num; ++i) {
|
||||
SStreamContinueExecInfo* pInfo = taosArrayGet(pTask->pRspMsgList, i);
|
||||
tmsgSendRsp(&pInfo->msg);
|
||||
|
||||
stDebug("s-task:%s level:%d notify upstream:0x%x continuing handle data in WAL", id, level, pInfo->taskId);
|
||||
}
|
||||
|
||||
taosArrayClear(pTask->pRspMsgList);
|
||||
stDebug("s-task:%s level:%d continue process msg sent to all %d upstreams", id, level, num);
|
||||
return 0;
|
||||
}
|
||||
|
||||
// this message has been sent successfully, let's try next one.
|
||||
static int32_t handleDispatchSuccessRsp(SStreamTask* pTask, int32_t downstreamId) {
|
||||
stDebug("s-task:%s destroy dispatch msg:%p", pTask->id.idStr, pTask->msgInfo.pData);
|
||||
|
|
|
@ -592,108 +592,6 @@ int32_t streamTaskPutTranstateIntoInputQ(SStreamTask* pTask) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t streamAggUpstreamScanHistoryFinish(SStreamTask* pTask) {
|
||||
void* exec = pTask->exec.pExecutor;
|
||||
if (pTask->info.fillHistory && qRestoreStreamOperatorOption(exec) < 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (qStreamRecoverFinish(exec) < 0) {
|
||||
return -1;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, SStreamScanHistoryFinishReq* pReq,
|
||||
SRpcHandleInfo* pRpcInfo) {
|
||||
int32_t taskLevel = pTask->info.taskLevel;
|
||||
ASSERT(taskLevel == TASK_LEVEL__AGG || taskLevel == TASK_LEVEL__SINK);
|
||||
|
||||
const char* id = pTask->id.idStr;
|
||||
SStreamTaskState* p = streamTaskGetStatus(pTask);
|
||||
|
||||
if (p->state != TASK_STATUS__SCAN_HISTORY) {
|
||||
stError("s-task:%s not in scan-history status, status:%s return upstream:0x%x scan-history finish directly", id,
|
||||
p->name, pReq->upstreamTaskId);
|
||||
|
||||
void* pBuf = NULL;
|
||||
int32_t len = 0;
|
||||
streamTaskBuildScanhistoryRspMsg(pTask, pReq, &pBuf, &len);
|
||||
|
||||
SRpcMsg msg = {.info = *pRpcInfo};
|
||||
initRpcMsg(&msg, 0, pBuf, sizeof(SMsgHead) + len);
|
||||
|
||||
tmsgSendRsp(&msg);
|
||||
stDebug("s-task:%s level:%d notify upstream:0x%x(vgId:%d) to continue process data in WAL", id, taskLevel,
|
||||
pReq->upstreamTaskId, pReq->upstreamNodeId);
|
||||
return 0;
|
||||
}
|
||||
|
||||
// sink tasks do not send end of scan history msg to its upstream, which is agg task.
|
||||
streamAddEndScanHistoryMsg(pTask, pRpcInfo, pReq);
|
||||
|
||||
int32_t left = atomic_sub_fetch_32(&pTask->numOfWaitingUpstream, 1);
|
||||
ASSERT(left >= 0);
|
||||
|
||||
if (left == 0) {
|
||||
int32_t numOfTasks = taosArrayGetSize(pTask->upstreamInfo.pList);
|
||||
if (taskLevel == TASK_LEVEL__AGG) {
|
||||
stDebug(
|
||||
"s-task:%s all %d upstream tasks finish scan-history data, set param for agg task for stream data processing "
|
||||
"and send rsp to all upstream tasks",
|
||||
id, numOfTasks);
|
||||
streamAggUpstreamScanHistoryFinish(pTask);
|
||||
} else {
|
||||
stDebug("s-task:%s all %d upstream task(s) finish scan-history data, and rsp to all upstream tasks", id,
|
||||
numOfTasks);
|
||||
}
|
||||
|
||||
// all upstream tasks have completed the scan-history task in the stream time window, let's start to extract data
|
||||
// from the WAL files, which contains the real time stream data.
|
||||
streamNotifyUpstreamContinue(pTask);
|
||||
|
||||
// mnode will not send the pause/resume message to the sink task, so no need to enable the pause for sink tasks.
|
||||
if (taskLevel == TASK_LEVEL__AGG) {
|
||||
/*int32_t code = */ streamTaskScanHistoryDataComplete(pTask);
|
||||
} else { // for sink task, set normal
|
||||
streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_SCANHIST_DONE);
|
||||
}
|
||||
} else {
|
||||
stDebug("s-task:%s receive scan-history data finish msg from upstream:0x%x(index:%d), unfinished:%d", id,
|
||||
pReq->upstreamTaskId, pReq->childId, left);
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t streamProcessScanHistoryFinishRsp(SStreamTask* pTask) {
|
||||
ETaskStatus status = streamTaskGetStatus(pTask)->state;
|
||||
|
||||
// task restart now, not handle the scan-history finish rsp
|
||||
if (status == TASK_STATUS__UNINIT) {
|
||||
return TSDB_CODE_INVALID_MSG;
|
||||
}
|
||||
|
||||
ASSERT(status == TASK_STATUS__SCAN_HISTORY/* || status == TASK_STATUS__STREAM_SCAN_HISTORY*/);
|
||||
SStreamMeta* pMeta = pTask->pMeta;
|
||||
|
||||
// execute in the scan history complete call back msg, ready to process data from inputQ
|
||||
int32_t code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_SCANHIST_DONE);
|
||||
streamTaskSetSchedStatusInactive(pTask);
|
||||
|
||||
streamMetaWLock(pMeta);
|
||||
streamMetaSaveTask(pMeta, pTask);
|
||||
streamMetaCommit(pMeta);
|
||||
streamMetaWUnLock(pMeta);
|
||||
|
||||
// for source tasks, let's continue execute.
|
||||
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
|
||||
streamSchedExec(pTask);
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static void checkFillhistoryTaskStatus(SStreamTask* pTask, SStreamTask* pHTask) {
|
||||
SDataRange* pRange = &pHTask->dataRange;
|
||||
|
||||
|
@ -946,29 +844,6 @@ int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) {
|
|||
}
|
||||
}
|
||||
|
||||
int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask) {
|
||||
if (streamTaskGetStatus(pTask)->state == TASK_STATUS__DROPPING) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
// restore param
|
||||
int32_t code = 0;
|
||||
if (pTask->info.fillHistory) {
|
||||
code = streamRestoreParam(pTask);
|
||||
if (code < 0) {
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
// dispatch scan-history finish req to all related downstream task
|
||||
code = streamDispatchScanHistoryFinishMsg(pTask);
|
||||
if (code < 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t streamTaskFillHistoryFinished(SStreamTask* pTask) {
|
||||
void* exec = pTask->exec.pExecutor;
|
||||
return qStreamInfoResetTimewindowFilter(exec);
|
||||
|
@ -1072,28 +947,6 @@ int32_t tDecodeStreamTaskCheckpointReq(SDecoder* pDecoder, SStreamTaskCheckpoint
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t tEncodeStreamScanHistoryFinishReq(SEncoder* pEncoder, const SStreamScanHistoryFinishReq* pReq) {
|
||||
if (tStartEncode(pEncoder) < 0) return -1;
|
||||
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
|
||||
if (tEncodeI32(pEncoder, pReq->upstreamTaskId) < 0) return -1;
|
||||
if (tEncodeI32(pEncoder, pReq->upstreamNodeId) < 0) return -1;
|
||||
if (tEncodeI32(pEncoder, pReq->downstreamTaskId) < 0) return -1;
|
||||
if (tEncodeI32(pEncoder, pReq->childId) < 0) return -1;
|
||||
tEndEncode(pEncoder);
|
||||
return pEncoder->pos;
|
||||
}
|
||||
|
||||
int32_t tDecodeStreamScanHistoryFinishReq(SDecoder* pDecoder, SStreamScanHistoryFinishReq* pReq) {
|
||||
if (tStartDecode(pDecoder) < 0) return -1;
|
||||
if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1;
|
||||
if (tDecodeI32(pDecoder, &pReq->upstreamTaskId) < 0) return -1;
|
||||
if (tDecodeI32(pDecoder, &pReq->upstreamNodeId) < 0) return -1;
|
||||
if (tDecodeI32(pDecoder, &pReq->downstreamTaskId) < 0) return -1;
|
||||
if (tDecodeI32(pDecoder, &pReq->childId) < 0) return -1;
|
||||
tEndDecode(pDecoder);
|
||||
return 0;
|
||||
}
|
||||
|
||||
void streamTaskSetRangeStreamCalc(SStreamTask* pTask) {
|
||||
SDataRange* pRange = &pTask->dataRange;
|
||||
|
||||
|
|
|
@ -139,7 +139,7 @@ class TDCom:
|
|||
self.stream_suffix = "_stream"
|
||||
self.range_count = 5
|
||||
self.default_interval = 5
|
||||
self.stream_timeout = 12
|
||||
self.stream_timeout = 60
|
||||
self.create_stream_sleep = 0.5
|
||||
self.record_history_ts = str()
|
||||
self.precision = "ms"
|
||||
|
@ -1688,8 +1688,8 @@ class TDCom:
|
|||
res1 = self.round_handle(res1)
|
||||
res2 = self.round_handle(res2)
|
||||
if latency < self.stream_timeout:
|
||||
latency += 0.2
|
||||
time.sleep(0.2)
|
||||
latency += 0.5
|
||||
time.sleep(0.5)
|
||||
else:
|
||||
if latency == 0:
|
||||
return False
|
||||
|
|
Loading…
Reference in New Issue