Merge pull request #22135 from taosdata/fix/3_liaohj
refactor: do some internal refactor.
This commit is contained in:
commit
321be1f83f
|
@ -45,8 +45,8 @@ enum {
|
|||
TASK_STATUS__FAIL,
|
||||
TASK_STATUS__STOP,
|
||||
TASK_STATUS__SCAN_HISTORY, // stream task scan history data by using tsdbread in the stream scanner
|
||||
TASK_STATUS__HALT, // stream task will handle all data in the input queue, and then paused
|
||||
TASK_STATUS__PAUSE,
|
||||
TASK_STATUS__HALT, // stream task will handle all data in the input queue, and then paused, todo remove it?
|
||||
TASK_STATUS__PAUSE, // pause
|
||||
};
|
||||
|
||||
enum {
|
||||
|
@ -272,6 +272,7 @@ typedef struct SStreamStatus {
|
|||
int8_t keepTaskStatus;
|
||||
bool transferState;
|
||||
int8_t timerActive; // timer is active
|
||||
int8_t pauseAllowed; // allowed task status to be set to be paused
|
||||
} SStreamStatus;
|
||||
|
||||
typedef struct SHistDataRange {
|
||||
|
@ -296,15 +297,15 @@ typedef struct SDispatchMsgInfo {
|
|||
} SDispatchMsgInfo;
|
||||
|
||||
typedef struct {
|
||||
int8_t outputType;
|
||||
int8_t outputStatus;
|
||||
SStreamQueue* outputQueue;
|
||||
} SSTaskOutputInfo;
|
||||
int8_t type;
|
||||
int8_t status;
|
||||
SStreamQueue* queue;
|
||||
} STaskOutputInfo;
|
||||
|
||||
struct SStreamTask {
|
||||
SStreamId id;
|
||||
SSTaskBasicInfo info;
|
||||
int8_t outputType;
|
||||
STaskOutputInfo outputInfo;
|
||||
SDispatchMsgInfo msgInfo;
|
||||
SStreamStatus status;
|
||||
SCheckpointInfo chkInfo;
|
||||
|
@ -315,7 +316,7 @@ struct SStreamTask {
|
|||
SArray* pUpstreamEpInfoList; // SArray<SStreamChildEpInfo*>, // children info
|
||||
int32_t nextCheckId;
|
||||
SArray* checkpointInfo; // SArray<SStreamCheckpointInfo>
|
||||
|
||||
int64_t initTs;
|
||||
// output
|
||||
union {
|
||||
STaskDispatcherFixedEp fixedEpDispatcher;
|
||||
|
@ -326,9 +327,7 @@ struct SStreamTask {
|
|||
};
|
||||
|
||||
int8_t inputStatus;
|
||||
int8_t outputStatus;
|
||||
SStreamQueue* inputQueue;
|
||||
SStreamQueue* outputQueue;
|
||||
|
||||
// trigger
|
||||
int8_t triggerStatus;
|
||||
|
@ -337,6 +336,8 @@ struct SStreamTask {
|
|||
void* launchTaskTimer;
|
||||
SMsgCb* pMsgCb; // msg handle
|
||||
SStreamState* pState; // state backend
|
||||
SArray* pRspMsgList;
|
||||
TdThreadMutex lock;
|
||||
|
||||
// the followings attributes don't be serialized
|
||||
int32_t notReadyTasks;
|
||||
|
@ -458,7 +459,9 @@ typedef struct {
|
|||
|
||||
typedef struct {
|
||||
int64_t streamId;
|
||||
int32_t taskId;
|
||||
int32_t upstreamTaskId;
|
||||
int32_t downstreamTaskId;
|
||||
int32_t upstreamNodeId;
|
||||
int32_t childId;
|
||||
} SStreamScanHistoryFinishReq, SStreamTransferReq;
|
||||
|
||||
|
@ -519,6 +522,17 @@ int32_t tDecodeSStreamCheckpointReq(SDecoder* pDecoder, SStreamCheckpointReq* pR
|
|||
int32_t tEncodeSStreamCheckpointRsp(SEncoder* pEncoder, const SStreamCheckpointRsp* pRsp);
|
||||
int32_t tDecodeSStreamCheckpointRsp(SDecoder* pDecoder, SStreamCheckpointRsp* pRsp);
|
||||
|
||||
typedef struct {
|
||||
int64_t streamId;
|
||||
int32_t upstreamTaskId;
|
||||
int32_t upstreamNodeId;
|
||||
int32_t downstreamId;
|
||||
int32_t downstreamNode;
|
||||
} SStreamCompleteHistoryMsg;
|
||||
|
||||
int32_t tEncodeCompleteHistoryDataMsg(SEncoder* pEncoder, const SStreamCompleteHistoryMsg* pReq);
|
||||
int32_t tDecodeCompleteHistoryDataMsg(SDecoder* pDecoder, SStreamCompleteHistoryMsg* pReq);
|
||||
|
||||
typedef struct {
|
||||
int64_t streamId;
|
||||
int32_t downstreamTaskId;
|
||||
|
@ -559,7 +573,6 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S
|
|||
int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, int32_t code);
|
||||
|
||||
int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pMsg);
|
||||
// int32_t streamProcessRetrieveRsp(SStreamTask* pTask, SStreamRetrieveRsp* pRsp);
|
||||
|
||||
void streamTaskInputFail(SStreamTask* pTask);
|
||||
int32_t streamTryExec(SStreamTask* pTask);
|
||||
|
@ -569,17 +582,20 @@ bool streamTaskShouldStop(const SStreamStatus* pStatus);
|
|||
bool streamTaskShouldPause(const SStreamStatus* pStatus);
|
||||
bool streamTaskIsIdle(const SStreamTask* pTask);
|
||||
|
||||
SStreamChildEpInfo * streamTaskGetUpstreamTaskEpInfo(SStreamTask* pTask, int32_t taskId);
|
||||
int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz);
|
||||
|
||||
char* createStreamTaskIdStr(int64_t streamId, int32_t taskId);
|
||||
|
||||
// recover and fill history
|
||||
void streamPrepareNdoCheckDownstream(SStreamTask* pTask);
|
||||
int32_t streamTaskCheckDownstreamTasks(SStreamTask* pTask);
|
||||
void streamTaskCheckDownstreamTasks(SStreamTask* pTask);
|
||||
int32_t streamTaskDoCheckDownstreamTasks(SStreamTask* pTask);
|
||||
int32_t streamTaskLaunchScanHistory(SStreamTask* pTask);
|
||||
int32_t streamTaskCheckStatus(SStreamTask* pTask);
|
||||
int32_t streamSendCheckRsp(const SStreamMeta* pMeta, const SStreamTaskCheckReq* pReq, SStreamTaskCheckRsp* pRsp,
|
||||
SRpcHandleInfo* pRpcInfo, int32_t taskId);
|
||||
int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp);
|
||||
int32_t streamCheckHistoryTaskDownstream(SStreamTask* pTask);
|
||||
int32_t streamLaunchFillHistoryTask(SStreamTask* pTask);
|
||||
int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask);
|
||||
int32_t streamStartRecoverTask(SStreamTask* pTask, int8_t igUntreated);
|
||||
void streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask);
|
||||
|
@ -593,6 +609,9 @@ int32_t streamSetParamForScanHistory(SStreamTask* pTask);
|
|||
int32_t streamRestoreParam(SStreamTask* pTask);
|
||||
int32_t streamSetStatusNormal(SStreamTask* pTask);
|
||||
const char* streamGetTaskStatusStr(int32_t status);
|
||||
void streamTaskPause(SStreamTask* pTask);
|
||||
void streamTaskDisablePause(SStreamTask* pTask);
|
||||
void streamTaskEnablePause(SStreamTask* pTask);
|
||||
|
||||
// source level
|
||||
int32_t streamSetParamForStreamScannerStep1(SStreamTask* pTask, SVersionRange* pVerRange, STimeWindow* pWindow);
|
||||
|
@ -604,8 +623,9 @@ int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask);
|
|||
int32_t streamDispatchTransferStateMsg(SStreamTask* pTask);
|
||||
|
||||
// agg level
|
||||
int32_t streamAggScanHistoryPrepare(SStreamTask* pTask);
|
||||
int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, int32_t taskId, int32_t childId);
|
||||
int32_t streamTaskScanHistoryPrepare(SStreamTask* pTask);
|
||||
int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, SStreamScanHistoryFinishReq *pReq, SRpcHandleInfo* pRpcInfo);
|
||||
int32_t streamProcessScanHistoryFinishRsp(SStreamTask* pTask);
|
||||
|
||||
// stream task meta
|
||||
void streamMetaInit();
|
||||
|
|
|
@ -79,6 +79,7 @@ SArray *smGetMsgHandles() {
|
|||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECK, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECK_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_SCAN_HISTORY_FINISH, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_SCAN_HISTORY_FINISH_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
|
||||
|
||||
code = 0;
|
||||
_OVER:
|
||||
|
|
|
@ -740,6 +740,7 @@ SArray *vmGetMsgHandles() {
|
|||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_SCAN_HISTORY_FINISH, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_SCAN_HISTORY_FINISH_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TRANSFER_STATE, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECK, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECK_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
|
||||
|
|
|
@ -25,6 +25,7 @@
|
|||
#define SINK_NODE_LEVEL (0)
|
||||
extern bool tsDeployOnSnode;
|
||||
|
||||
static int32_t setTaskUpstreamEpInfo(const SStreamTask* pTask, SStreamTask* pDownstream);
|
||||
static int32_t mndAddSinkTaskToStream(SStreamObj* pStream, SArray* pTaskList, SMnode* pMnode, int32_t vgId,
|
||||
SVgObj* pVgroup, int32_t fillHistory);
|
||||
static void setFixedDownstreamEpInfo(SStreamTask* pDstTask, const SStreamTask* pTask);
|
||||
|
@ -87,10 +88,10 @@ END:
|
|||
|
||||
int32_t mndSetSinkTaskInfo(SStreamObj* pStream, SStreamTask* pTask) {
|
||||
if (pStream->smaId != 0) {
|
||||
pTask->outputType = TASK_OUTPUT__SMA;
|
||||
pTask->outputInfo.type = TASK_OUTPUT__SMA;
|
||||
pTask->smaSink.smaId = pStream->smaId;
|
||||
} else {
|
||||
pTask->outputType = TASK_OUTPUT__TABLE;
|
||||
pTask->outputInfo.type = TASK_OUTPUT__TABLE;
|
||||
pTask->tbSink.stbUid = pStream->targetStbUid;
|
||||
memcpy(pTask->tbSink.stbFullName, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN);
|
||||
pTask->tbSink.pSchemaWrapper = tCloneSSchemaWrapper(&pStream->outputSchema);
|
||||
|
@ -110,7 +111,7 @@ int32_t mndAddDispatcherForInternalTask(SMnode* pMnode, SStreamObj* pStream, SAr
|
|||
SDbObj* pDb = mndAcquireDb(pMnode, pStream->targetDb);
|
||||
if (pDb != NULL && pDb->cfg.numOfVgroups > 1) {
|
||||
isShuffle = true;
|
||||
pTask->outputType = TASK_OUTPUT__SHUFFLE_DISPATCH;
|
||||
pTask->outputInfo.type = TASK_OUTPUT__SHUFFLE_DISPATCH;
|
||||
pTask->msgInfo.msgType = TDMT_STREAM_TASK_DISPATCH;
|
||||
if (mndExtractDbInfo(pMnode, pDb, &pTask->shuffleDispatcher.dbInfo, NULL) < 0) {
|
||||
return -1;
|
||||
|
@ -267,10 +268,15 @@ static int32_t addSourceStreamTask(SMnode* pMnode, SVgObj* pVgroup, SArray* pTas
|
|||
return terrno;
|
||||
}
|
||||
|
||||
for(int32_t i = 0; i < taosArrayGetSize(pSinkTaskList); ++i) {
|
||||
SStreamTask* pSinkTask = taosArrayGetP(pSinkTaskList, i);
|
||||
setTaskUpstreamEpInfo(pTask, pSinkTask);
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static SStreamChildEpInfo* createStreamTaskEpInfo(SStreamTask* pTask) {
|
||||
static SStreamChildEpInfo* createStreamTaskEpInfo(const SStreamTask* pTask) {
|
||||
SStreamChildEpInfo* pEpInfo = taosMemoryMalloc(sizeof(SStreamChildEpInfo));
|
||||
if (pEpInfo == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
@ -291,11 +297,11 @@ void setFixedDownstreamEpInfo(SStreamTask* pDstTask, const SStreamTask* pTask) {
|
|||
pDispatcher->nodeId = pTask->info.nodeId;
|
||||
pDispatcher->epSet = pTask->info.epSet;
|
||||
|
||||
pDstTask->outputType = TASK_OUTPUT__FIXED_DISPATCH;
|
||||
pDstTask->outputInfo.type = TASK_OUTPUT__FIXED_DISPATCH;
|
||||
pDstTask->msgInfo.msgType = TDMT_STREAM_TASK_DISPATCH;
|
||||
}
|
||||
|
||||
int32_t setEpToDownstreamTask(SStreamTask* pTask, SStreamTask* pDownstream) {
|
||||
int32_t setTaskUpstreamEpInfo(const SStreamTask* pTask, SStreamTask* pDownstream) {
|
||||
SStreamChildEpInfo* pEpInfo = createStreamTaskEpInfo(pTask);
|
||||
if (pEpInfo == NULL) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
@ -418,7 +424,7 @@ static int32_t doAddSourceTask(SArray* pTaskList, int8_t fillHistory, int64_t ui
|
|||
return -1;
|
||||
}
|
||||
|
||||
return setEpToDownstreamTask(pTask, pDownstreamTask);
|
||||
return setTaskUpstreamEpInfo(pTask, pDownstreamTask);
|
||||
}
|
||||
|
||||
static int32_t doAddAggTask(uint64_t uid, SArray* pTaskList, SArray* pSinkNodeList, SMnode* pMnode, SStreamObj* pStream,
|
||||
|
@ -586,6 +592,14 @@ static int32_t addSinkTasks(SArray* pTasksList, SMnode* pMnode, SStreamObj* pStr
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static void setSinkTaskUpstreamInfo(SArray* pTasksList, const SStreamTask* pUpstreamTask) {
|
||||
SArray* pSinkTaskList = taosArrayGetP(pTasksList, SINK_NODE_LEVEL);
|
||||
for(int32_t i = 0; i < taosArrayGetSize(pSinkTaskList); ++i) {
|
||||
SStreamTask* pSinkTask = taosArrayGetP(pSinkTaskList, i);
|
||||
setTaskUpstreamEpInfo(pUpstreamTask, pSinkTask);
|
||||
}
|
||||
}
|
||||
|
||||
static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan* pPlan, int64_t nextWindowSkey) {
|
||||
SSdb* pSdb = pMnode->pSdb;
|
||||
int32_t numOfPlanLevel = LIST_LENGTH(pPlan->pSubplans);
|
||||
|
@ -637,6 +651,9 @@ static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan*
|
|||
return code;
|
||||
}
|
||||
|
||||
setSinkTaskUpstreamInfo(pStream->tasks, pAggTask);
|
||||
setSinkTaskUpstreamInfo(pStream->pHTasksList, pHAggTask);
|
||||
|
||||
// source level
|
||||
return addSourceTasksForMultiLevelStream(pMnode, pPlan, pStream, pAggTask, pHAggTask, nextWindowSkey);
|
||||
} else if (numOfPlanLevel == 1) {
|
||||
|
|
|
@ -66,14 +66,15 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t ver) {
|
|||
|
||||
pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE;
|
||||
pTask->inputQueue = streamQueueOpen(512 << 10);
|
||||
pTask->outputQueue = streamQueueOpen(512 << 10);
|
||||
pTask->outputInfo.queue = streamQueueOpen(512 << 10);
|
||||
|
||||
if (pTask->inputQueue == NULL || pTask->outputQueue == NULL) {
|
||||
if (pTask->inputQueue == NULL || pTask->outputInfo.queue == NULL) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
pTask->initTs = taosGetTimestampMs();
|
||||
pTask->inputStatus = TASK_INPUT_STATUS__NORMAL;
|
||||
pTask->outputStatus = TASK_OUTPUT_STATUS__NORMAL;
|
||||
pTask->outputInfo.status = TASK_OUTPUT_STATUS__NORMAL;
|
||||
pTask->pMsgCb = &pSnode->msgCb;
|
||||
pTask->chkInfo.version = ver;
|
||||
pTask->pMeta = pSnode->pMeta;
|
||||
|
@ -90,6 +91,7 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t ver) {
|
|||
pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, 0);
|
||||
ASSERT(pTask->exec.pExecutor);
|
||||
|
||||
taosThreadMutexInit(&pTask->lock, NULL);
|
||||
streamSetupScheduleTrigger(pTask);
|
||||
|
||||
qDebug("snode:%d expand stream task on snode, s-task:%s, checkpoint ver:%" PRId64 " child id:%d, level:%d", SNODE_HANDLE,
|
||||
|
@ -166,11 +168,10 @@ int32_t sndProcessTaskDeployReq(SSnode *pSnode, char *msg, int32_t msgLen) {
|
|||
|
||||
int32_t numOfTasks = streamMetaGetNumOfTasks(pSnode->pMeta);
|
||||
taosWUnLockLatch(&pSnode->pMeta->lock);
|
||||
|
||||
streamPrepareNdoCheckDownstream(pTask);
|
||||
qDebug("snode:%d s-task:%s is deployed on snode and add into meta, status:%s, numOfTasks:%d", SNODE_HANDLE, pTask->id.idStr,
|
||||
streamGetTaskStatusStr(pTask->status.taskStatus), numOfTasks);
|
||||
streamGetTaskStatusStr(pTask->status.taskStatus), numOfTasks);
|
||||
|
||||
streamTaskCheckDownstreamTasks(pTask);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -274,7 +275,7 @@ int32_t sndProcessWriteMsg(SSnode *pSnode, SRpcMsg *pMsg, SRpcMsg *pRsp) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t sndProcessTaskRecoverFinishReq(SSnode *pSnode, SRpcMsg *pMsg) {
|
||||
int32_t sndProcessStreamTaskScanHistoryFinishReq(SSnode *pSnode, SRpcMsg *pMsg) {
|
||||
char *msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
|
||||
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
|
||||
|
||||
|
@ -287,12 +288,12 @@ int32_t sndProcessTaskRecoverFinishReq(SSnode *pSnode, SRpcMsg *pMsg) {
|
|||
tDecoderClear(&decoder);
|
||||
|
||||
// find task
|
||||
SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, req.taskId);
|
||||
SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, req.downstreamTaskId);
|
||||
if (pTask == NULL) {
|
||||
return -1;
|
||||
}
|
||||
// do process request
|
||||
if (streamProcessScanHistoryFinishReq(pTask, req.taskId, req.childId) < 0) {
|
||||
if (streamProcessScanHistoryFinishReq(pTask, &req, &pMsg->info) < 0) {
|
||||
streamMetaReleaseTask(pSnode->pMeta, pTask);
|
||||
return -1;
|
||||
}
|
||||
|
@ -415,7 +416,7 @@ int32_t sndProcessStreamMsg(SSnode *pSnode, SRpcMsg *pMsg) {
|
|||
case TDMT_STREAM_RETRIEVE_RSP:
|
||||
return sndProcessTaskRetrieveRsp(pSnode, pMsg);
|
||||
case TDMT_STREAM_SCAN_HISTORY_FINISH:
|
||||
return sndProcessTaskRecoverFinishReq(pSnode, pMsg);
|
||||
return sndProcessStreamTaskScanHistoryFinishReq(pSnode, pMsg);
|
||||
case TDMT_STREAM_SCAN_HISTORY_FINISH_RSP:
|
||||
return sndProcessTaskRecoverFinishRsp(pSnode, pMsg);
|
||||
case TDMT_STREAM_TASK_CHECK:
|
||||
|
|
|
@ -250,8 +250,8 @@ int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg);
|
|||
int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg);
|
||||
int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg);
|
||||
int32_t tqProcessTaskTransferStateReq(STQ* pTq, SRpcMsg* pMsg);
|
||||
int32_t tqProcessStreamTaskScanHistoryFinishReq(STQ* pTq, SRpcMsg* pMsg);
|
||||
int32_t tqProcessTaskRecoverFinishRsp(STQ* pTq, SRpcMsg* pMsg);
|
||||
int32_t tqProcessTaskScanHistoryFinishReq(STQ* pTq, SRpcMsg* pMsg);
|
||||
int32_t tqProcessTaskScanHistoryFinishRsp(STQ* pTq, SRpcMsg* pMsg);
|
||||
int32_t tqCheckLogInWal(STQ* pTq, int64_t version);
|
||||
|
||||
// sma
|
||||
|
|
|
@ -811,17 +811,20 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
|
|||
pTask->refCnt = 1;
|
||||
pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE;
|
||||
pTask->inputQueue = streamQueueOpen(512 << 10);
|
||||
pTask->outputQueue = streamQueueOpen(512 << 10);
|
||||
pTask->outputInfo.queue = streamQueueOpen(512 << 10);
|
||||
|
||||
if (pTask->inputQueue == NULL || pTask->outputQueue == NULL) {
|
||||
if (pTask->inputQueue == NULL || pTask->outputInfo.queue == NULL) {
|
||||
tqError("s-task:%s failed to prepare the input/output queue, initialize task failed", pTask->id.idStr);
|
||||
return -1;
|
||||
}
|
||||
|
||||
pTask->initTs = taosGetTimestampMs();
|
||||
pTask->inputStatus = TASK_INPUT_STATUS__NORMAL;
|
||||
pTask->outputStatus = TASK_OUTPUT_STATUS__NORMAL;
|
||||
pTask->outputInfo.status = TASK_OUTPUT_STATUS__NORMAL;
|
||||
pTask->pMsgCb = &pTq->pVnode->msgCb;
|
||||
pTask->pMeta = pTq->pStreamMeta;
|
||||
|
||||
// backup the initial status, and set it to be TASK_STATUS__INIT
|
||||
pTask->chkInfo.version = ver;
|
||||
pTask->chkInfo.currentVer = ver;
|
||||
|
||||
|
@ -880,15 +883,14 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
|
|||
if (pTask->exec.pExecutor == NULL) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
qSetTaskId(pTask->exec.pExecutor, pTask->id.taskId, pTask->id.streamId);
|
||||
}
|
||||
|
||||
// sink
|
||||
if (pTask->outputType == TASK_OUTPUT__SMA) {
|
||||
if (pTask->outputInfo.type == TASK_OUTPUT__SMA) {
|
||||
pTask->smaSink.vnode = pTq->pVnode;
|
||||
pTask->smaSink.smaSink = smaHandleRes;
|
||||
} else if (pTask->outputType == TASK_OUTPUT__TABLE) {
|
||||
} else if (pTask->outputInfo.type == TASK_OUTPUT__TABLE) {
|
||||
pTask->tbSink.vnode = pTq->pVnode;
|
||||
pTask->tbSink.tbSinkFunc = tqSinkToTablePipeline;
|
||||
|
||||
|
@ -913,10 +915,17 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
|
|||
pTask->exec.pWalReader = walOpenReader(pTq->pVnode->pWal, &cond);
|
||||
}
|
||||
|
||||
// reset the task status from unfinished transaction
|
||||
if (pTask->status.taskStatus == TASK_STATUS__PAUSE) {
|
||||
tqWarn("s-task:%s reset task status to be normal, kept in meta status: Paused", pTask->id.idStr);
|
||||
pTask->status.taskStatus = TASK_STATUS__NORMAL;
|
||||
}
|
||||
|
||||
taosThreadMutexInit(&pTask->lock, NULL);
|
||||
streamSetupScheduleTrigger(pTask);
|
||||
|
||||
tqInfo("vgId:%d expand stream task, s-task:%s, checkpoint ver:%" PRId64
|
||||
" child id:%d, level:%d, scan-history:%d, trigger:%" PRId64 " ms",
|
||||
" child id:%d, level:%d, scan-history:%d, trigger:%" PRId64 " ms, disable pause",
|
||||
vgId, pTask->id.idStr, pTask->chkInfo.version, pTask->info.selfChildId, pTask->info.taskLevel,
|
||||
pTask->info.fillHistory, pTask->triggerParam);
|
||||
|
||||
|
@ -963,28 +972,7 @@ int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) {
|
|||
taskId, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status);
|
||||
}
|
||||
|
||||
SEncoder encoder;
|
||||
int32_t code;
|
||||
int32_t len;
|
||||
|
||||
tEncodeSize(tEncodeStreamTaskCheckRsp, &rsp, len, code);
|
||||
if (code < 0) {
|
||||
tqError("vgId:%d failed to encode task check rsp, task:0x%x", pTq->pStreamMeta->vgId, taskId);
|
||||
return -1;
|
||||
}
|
||||
|
||||
void* buf = rpcMallocCont(sizeof(SMsgHead) + len);
|
||||
((SMsgHead*)buf)->vgId = htonl(req.upstreamNodeId);
|
||||
|
||||
void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
|
||||
tEncoderInit(&encoder, (uint8_t*)abuf, len);
|
||||
tEncodeStreamTaskCheckRsp(&encoder, &rsp);
|
||||
tEncoderClear(&encoder);
|
||||
|
||||
SRpcMsg rspMsg = {.code = 0, .pCont = buf, .contLen = sizeof(SMsgHead) + len, .info = pMsg->info};
|
||||
|
||||
tmsgSendRsp(&rspMsg);
|
||||
return 0;
|
||||
return streamSendCheckRsp(pTq->pStreamMeta, &req, &rsp, &pMsg->info, taskId);
|
||||
}
|
||||
|
||||
int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t sversion, SRpcMsg* pMsg) {
|
||||
|
@ -1051,9 +1039,11 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms
|
|||
SStreamMeta* pStreamMeta = pTq->pStreamMeta;
|
||||
|
||||
// 2.save task, use the newest commit version as the initial start version of stream task.
|
||||
int32_t taskId = 0;
|
||||
taosWLockLatch(&pStreamMeta->lock);
|
||||
code = streamMetaAddDeployedTask(pStreamMeta, sversion, pTask);
|
||||
|
||||
taskId = pTask->id.taskId;
|
||||
int32_t numOfTasks = streamMetaGetNumOfTasks(pStreamMeta);
|
||||
if (code < 0) {
|
||||
tqError("vgId:%d failed to add s-task:%s, total:%d", vgId, pTask->id.idStr, numOfTasks);
|
||||
|
@ -1062,13 +1052,16 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms
|
|||
}
|
||||
|
||||
taosWUnLockLatch(&pStreamMeta->lock);
|
||||
|
||||
// 3. It's an fill history task, do nothing. wait for the main task to start it
|
||||
streamPrepareNdoCheckDownstream(pTask);
|
||||
|
||||
tqDebug("vgId:%d s-task:%s is deployed and add into meta, status:%s, numOfTasks:%d", vgId, pTask->id.idStr,
|
||||
streamGetTaskStatusStr(pTask->status.taskStatus), numOfTasks);
|
||||
|
||||
// 3. It's an fill history task, do nothing. wait for the main task to start it
|
||||
SStreamTask* p = streamMetaAcquireTask(pStreamMeta, taskId);
|
||||
if (p != NULL) {
|
||||
streamTaskCheckDownstreamTasks(pTask);
|
||||
}
|
||||
|
||||
streamMetaReleaseTask(pStreamMeta, p);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -1087,16 +1080,23 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
|
|||
}
|
||||
|
||||
// do recovery step 1
|
||||
const char* pId = pTask->id.idStr;
|
||||
tqDebug("s-task:%s start history data scan stage(step 1), status:%s", pId,
|
||||
streamGetTaskStatusStr(pTask->status.taskStatus));
|
||||
const char* id = pTask->id.idStr;
|
||||
const char* pStatus = streamGetTaskStatusStr(pTask->status.taskStatus);
|
||||
tqDebug("s-task:%s start history data scan stage(step 1), status:%s", id, pStatus);
|
||||
|
||||
int64_t st = taosGetTimestampMs();
|
||||
int8_t schedStatus = atomic_val_compare_exchange_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE,
|
||||
TASK_SCHED_STATUS__WAITING);
|
||||
if (schedStatus != TASK_SCHED_STATUS__INACTIVE) {
|
||||
ASSERT(0);
|
||||
return 0;
|
||||
|
||||
// we have to continue retrying to successfully execute the scan history task.
|
||||
while (1) {
|
||||
int8_t schedStatus = atomic_val_compare_exchange_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE,
|
||||
TASK_SCHED_STATUS__WAITING);
|
||||
if (schedStatus == TASK_SCHED_STATUS__INACTIVE) {
|
||||
break;
|
||||
}
|
||||
|
||||
tqError("s-task:%s failed to start scan history in current time window, unexpected sched-status:%d, retry in 100ms",
|
||||
id, schedStatus);
|
||||
taosMsleep(100);
|
||||
}
|
||||
|
||||
if (!streamTaskRecoverScanStep1Finished(pTask)) {
|
||||
|
@ -1104,16 +1104,18 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
|
|||
}
|
||||
|
||||
if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING || streamTaskShouldPause(&pTask->status)) {
|
||||
tqDebug("s-task:%s is dropped or paused, abort recover in step1", pId);
|
||||
tqDebug("s-task:%s is dropped or paused, abort recover in step1", id);
|
||||
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
|
||||
streamMetaReleaseTask(pMeta, pTask);
|
||||
return 0;
|
||||
}
|
||||
|
||||
double el = (taosGetTimestampMs() - st) / 1000.0;
|
||||
tqDebug("s-task:%s history data scan stage(step 1) ended, elapsed time:%.2fs", pId, el);
|
||||
tqDebug("s-task:%s history data scan stage(step 1) ended, elapsed time:%.2fs", id, el);
|
||||
|
||||
if (pTask->info.fillHistory) {
|
||||
streamTaskEnablePause(pTask);
|
||||
|
||||
SVersionRange* pRange = NULL;
|
||||
SStreamTask* pStreamTask = NULL;
|
||||
|
||||
|
@ -1125,7 +1127,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
|
|||
pTask->streamTaskId.taskId, pTask->id.idStr);
|
||||
|
||||
pTask->status.taskStatus = TASK_STATUS__DROPPING;
|
||||
tqDebug("s-task:%s scan-history-task set status to be dropping", pId);
|
||||
tqDebug("s-task:%s scan-history-task set status to be dropping", id);
|
||||
|
||||
streamMetaSaveTask(pMeta, pTask);
|
||||
streamMetaReleaseTask(pMeta, pTask);
|
||||
|
@ -1135,18 +1137,17 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
|
|||
ASSERT(pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE);
|
||||
|
||||
// wait for the stream task get ready for scan history data
|
||||
while (((pStreamTask->status.downstreamReady == 0) && (pStreamTask->status.taskStatus != TASK_STATUS__STOP)) ||
|
||||
pStreamTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) {
|
||||
while (pStreamTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) {
|
||||
tqDebug(
|
||||
"s-task:%s level:%d related stream task:%s(status:%s) not ready for halt, wait for it and recheck in 100ms",
|
||||
pId, pTask->info.taskLevel, pStreamTask->id.idStr, streamGetTaskStatusStr(pStreamTask->status.taskStatus));
|
||||
id, pTask->info.taskLevel, pStreamTask->id.idStr, streamGetTaskStatusStr(pStreamTask->status.taskStatus));
|
||||
taosMsleep(100);
|
||||
}
|
||||
|
||||
// now we can stop the stream task execution
|
||||
pStreamTask->status.taskStatus = TASK_STATUS__HALT;
|
||||
tqDebug("s-task:%s level:%d status is set to halt by history scan task:%s", pStreamTask->id.idStr,
|
||||
pStreamTask->info.taskLevel, pId);
|
||||
pStreamTask->info.taskLevel, id);
|
||||
|
||||
// if it's an source task, extract the last version in wal.
|
||||
streamHistoryTaskSetVerRangeStep2(pTask);
|
||||
|
@ -1154,7 +1155,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
|
|||
|
||||
if (!streamTaskRecoverScanStep1Finished(pTask)) {
|
||||
tqDebug("s-task:%s level:%d verRange:%" PRId64 " - %" PRId64 " do secondary scan-history-data after halt the related stream task:%s",
|
||||
pId, pTask->info.taskLevel, pRange->minVer, pRange->maxVer, pId);
|
||||
id, pTask->info.taskLevel, pRange->minVer, pRange->maxVer, id);
|
||||
ASSERT(pTask->status.schedStatus == TASK_SCHED_STATUS__WAITING);
|
||||
|
||||
st = taosGetTimestampMs();
|
||||
|
@ -1165,7 +1166,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
|
|||
streamSourceScanHistoryData(pTask);
|
||||
|
||||
if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING || streamTaskShouldPause(&pTask->status)) {
|
||||
tqDebug("s-task:%s is dropped or paused, abort recover in step1", pId);
|
||||
tqDebug("s-task:%s is dropped or paused, abort recover in step1", id);
|
||||
streamMetaReleaseTask(pMeta, pTask);
|
||||
return 0;
|
||||
}
|
||||
|
@ -1174,7 +1175,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
|
|||
}
|
||||
|
||||
el = (taosGetTimestampMs() - st) / 1000.0;
|
||||
tqDebug("s-task:%s history data scan stage(step 2) ended, elapsed time:%.2fs", pId, el);
|
||||
tqDebug("s-task:%s history data scan stage(step 2) ended, elapsed time:%.2fs", id, el);
|
||||
|
||||
// 3. notify downstream tasks to transfer executor state after handle all history blocks.
|
||||
if (!pTask->status.transferState) {
|
||||
|
@ -1190,20 +1191,8 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
|
|||
// 5. resume the related stream task.
|
||||
streamTryExec(pTask);
|
||||
|
||||
pTask->status.taskStatus = TASK_STATUS__DROPPING;
|
||||
tqDebug("s-task:%s scan-history-task set status to be dropping", pId);
|
||||
|
||||
streamMetaSaveTask(pMeta, pTask);
|
||||
streamMetaSaveTask(pMeta, pStreamTask);
|
||||
|
||||
streamMetaReleaseTask(pMeta, pTask);
|
||||
streamMetaReleaseTask(pMeta, pStreamTask);
|
||||
|
||||
taosWLockLatch(&pMeta->lock);
|
||||
if (streamMetaCommit(pTask->pMeta) < 0) {
|
||||
// persist to disk
|
||||
}
|
||||
taosWUnLockLatch(&pMeta->lock);
|
||||
} else {
|
||||
// todo update the chkInfo version for current task.
|
||||
// this task has an associated history stream task, so we need to scan wal from the end version of
|
||||
|
@ -1212,24 +1201,23 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
|
|||
|
||||
if (pTask->historyTaskId.taskId == 0) {
|
||||
*pWindow = (STimeWindow){INT64_MIN, INT64_MAX};
|
||||
tqDebug("s-task:%s no related scan-history-data task, reset the time window:%" PRId64 " - %" PRId64, pId,
|
||||
pWindow->skey, pWindow->ekey);
|
||||
tqDebug(
|
||||
"s-task:%s scan history in stream time window completed, no related fill history task, reset the time "
|
||||
"window:%" PRId64 " - %" PRId64,
|
||||
id, pWindow->skey, pWindow->ekey);
|
||||
} else {
|
||||
tqDebug(
|
||||
"s-task:%s history data in current time window scan completed, now start to handle data from WAL, start "
|
||||
"s-task:%s scan history in stream time window completed, now start to handle data from WAL, start "
|
||||
"ver:%" PRId64 ", window:%" PRId64 " - %" PRId64,
|
||||
pId, pTask->chkInfo.currentVer, pWindow->skey, pWindow->ekey);
|
||||
id, pTask->chkInfo.currentVer, pWindow->skey, pWindow->ekey);
|
||||
}
|
||||
|
||||
// notify the downstream agg tasks that upstream tasks are ready to processing the WAL data, update the
|
||||
code = streamTaskScanHistoryDataComplete(pTask);
|
||||
streamMetaReleaseTask(pMeta, pTask);
|
||||
|
||||
// let's start the stream task by extracting data from wal
|
||||
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
|
||||
tqStartStreamTasks(pTq);
|
||||
}
|
||||
|
||||
// when all source task complete to scan history data in stream time window, they are allowed to handle stream data
|
||||
// at the same time.
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -1248,35 +1236,24 @@ int32_t tqProcessTaskTransferStateReq(STQ* pTq, SRpcMsg* pMsg) {
|
|||
int32_t code = tDecodeStreamScanHistoryFinishReq(&decoder, &req);
|
||||
tDecoderClear(&decoder);
|
||||
|
||||
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.taskId);
|
||||
tqDebug("vgId:%d start to process transfer state msg, from s-task:0x%x", pTq->pStreamMeta->vgId, req.downstreamTaskId);
|
||||
|
||||
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.downstreamTaskId);
|
||||
if (pTask == NULL) {
|
||||
tqError("failed to find task:0x%x, it may have been dropped already", req.taskId);
|
||||
tqError("failed to find task:0x%x, it may have been dropped already. process transfer state failed", req.downstreamTaskId);
|
||||
return -1;
|
||||
}
|
||||
|
||||
int32_t remain = streamAlignTransferState(pTask);
|
||||
if (remain > 0) {
|
||||
tqDebug("s-task:%s receive transfer state msg, remain:%d", pTask->id.idStr, remain);
|
||||
tqDebug("s-task:%s receive upstream transfer state msg, remain:%d", pTask->id.idStr, remain);
|
||||
return 0;
|
||||
}
|
||||
|
||||
// transfer the ownership of executor state
|
||||
tqDebug("s-task:%s all upstream tasks end transfer msg", pTask->id.idStr);
|
||||
tqDebug("s-task:%s all upstream tasks send transfer msg, open transfer state flag", pTask->id.idStr);
|
||||
ASSERT(pTask->streamTaskId.taskId != 0 && pTask->info.fillHistory == 1);
|
||||
|
||||
// related stream task load the state from the state storage backend
|
||||
SStreamTask* pStreamTask = streamMetaAcquireTask(pTq->pStreamMeta, pTask->streamTaskId.taskId);
|
||||
if (pStreamTask == NULL) {
|
||||
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
|
||||
tqError("failed to find related stream task:0x%x, it may have been dropped already", req.taskId);
|
||||
return -1;
|
||||
}
|
||||
|
||||
// when all upstream tasks have notified the this task to start transfer state, then we start the transfer procedure.
|
||||
streamTaskReleaseState(pTask);
|
||||
streamTaskReloadState(pStreamTask);
|
||||
streamMetaReleaseTask(pTq->pStreamMeta, pStreamTask);
|
||||
|
||||
ASSERT(pTask->streamTaskId.taskId != 0);
|
||||
pTask->status.transferState = true;
|
||||
|
||||
streamSchedExec(pTask);
|
||||
|
@ -1284,7 +1261,7 @@ int32_t tqProcessTaskTransferStateReq(STQ* pTq, SRpcMsg* pMsg) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t tqProcessStreamTaskScanHistoryFinishReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||
int32_t tqProcessTaskScanHistoryFinishReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||
char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
|
||||
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
|
||||
|
||||
|
@ -1296,20 +1273,49 @@ int32_t tqProcessStreamTaskScanHistoryFinishReq(STQ* pTq, SRpcMsg* pMsg) {
|
|||
tDecodeStreamScanHistoryFinishReq(&decoder, &req);
|
||||
tDecoderClear(&decoder);
|
||||
|
||||
// find task
|
||||
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.taskId);
|
||||
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.downstreamTaskId);
|
||||
if (pTask == NULL) {
|
||||
tqError("failed to find task:0x%x, it may be destroyed, vgId:%d", req.taskId, pTq->pStreamMeta->vgId);
|
||||
tqError("vgId:%d process scan history finish msg, failed to find task:0x%x, it may be destroyed",
|
||||
pTq->pStreamMeta->vgId, req.downstreamTaskId);
|
||||
return -1;
|
||||
}
|
||||
|
||||
int32_t code = streamProcessScanHistoryFinishReq(pTask, req.taskId, req.childId);
|
||||
tqDebug("s-task:%s receive scan-history finish msg from task:0x%x", pTask->id.idStr, req.upstreamTaskId);
|
||||
|
||||
int32_t code = streamProcessScanHistoryFinishReq(pTask, &req, &pMsg->info);
|
||||
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t tqProcessTaskRecoverFinishRsp(STQ* pTq, SRpcMsg* pMsg) {
|
||||
//
|
||||
int32_t tqProcessTaskScanHistoryFinishRsp(STQ* pTq, SRpcMsg* pMsg) {
|
||||
char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
|
||||
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
|
||||
|
||||
// deserialize
|
||||
SStreamCompleteHistoryMsg req = {0};
|
||||
|
||||
SDecoder decoder;
|
||||
tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
|
||||
tDecodeCompleteHistoryDataMsg(&decoder, &req);
|
||||
tDecoderClear(&decoder);
|
||||
|
||||
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.upstreamTaskId);
|
||||
if (pTask == NULL) {
|
||||
tqError("vgId:%d process scan history finish rsp, failed to find task:0x%x, it may be destroyed",
|
||||
pTq->pStreamMeta->vgId, req.upstreamTaskId);
|
||||
return -1;
|
||||
}
|
||||
|
||||
tqDebug("s-task:%s scan-history finish rsp received from task:0x%x", pTask->id.idStr, req.downstreamId);
|
||||
|
||||
int32_t remain = atomic_sub_fetch_32(&pTask->notReadyTasks, 1);
|
||||
if (remain > 0) {
|
||||
tqDebug("s-task:%s remain:%d not send finish rsp", pTask->id.idStr, remain);
|
||||
} else {
|
||||
streamProcessScanHistoryFinishRsp(pTask);
|
||||
}
|
||||
|
||||
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -1386,7 +1392,7 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
|
|||
if (pTask != NULL) {
|
||||
// even in halt status, the data in inputQ must be processed
|
||||
int8_t status = pTask->status.taskStatus;
|
||||
if (status == TASK_STATUS__NORMAL || status == TASK_STATUS__HALT) {
|
||||
if (status == TASK_STATUS__NORMAL || status == TASK_STATUS__HALT || status == TASK_STATUS__SCAN_HISTORY) {
|
||||
tqDebug("vgId:%d s-task:%s start to process block from inputQ, last chk point:%" PRId64, vgId, pTask->id.idStr,
|
||||
pTask->chkInfo.version);
|
||||
streamProcessRunReq(pTask);
|
||||
|
@ -1452,32 +1458,45 @@ int32_t tqProcessTaskDropReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgL
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t tqProcessTaskPauseImpl(SStreamMeta* pStreamMeta, SStreamTask* pTask) {
|
||||
if (pTask) {
|
||||
if (!streamTaskShouldPause(&pTask->status)) {
|
||||
tqDebug("vgId:%d s-task:%s set pause flag", pStreamMeta->vgId, pTask->id.idStr);
|
||||
atomic_store_8(&pTask->status.keepTaskStatus, pTask->status.taskStatus);
|
||||
atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__PAUSE);
|
||||
}
|
||||
streamMetaReleaseTask(pStreamMeta, pTask);
|
||||
} else {
|
||||
return -1;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
|
||||
SVPauseStreamTaskReq* pReq = (SVPauseStreamTaskReq*)msg;
|
||||
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId);
|
||||
int32_t code = tqProcessTaskPauseImpl(pTq->pStreamMeta, pTask);
|
||||
if (code != 0) {
|
||||
return code;
|
||||
|
||||
SStreamMeta* pMeta = pTq->pStreamMeta;
|
||||
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->taskId);
|
||||
if (pTask == NULL) {
|
||||
tqError("vgId:%d failed to acquire task:0x%x, it may have been dropped already", pMeta->vgId,
|
||||
pReq->taskId);
|
||||
|
||||
// since task is in [STOP|DROPPING] state, it is safe to assume the pause is active
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
SStreamTask* pHistoryTask = streamMetaAcquireTask(pTq->pStreamMeta, pTask->historyTaskId.taskId);
|
||||
if (pHistoryTask) {
|
||||
code = tqProcessTaskPauseImpl(pTq->pStreamMeta, pHistoryTask);
|
||||
|
||||
tqDebug("s-task:%s receive pause msg from mnode", pTask->id.idStr);
|
||||
streamTaskPause(pTask);
|
||||
|
||||
SStreamTask* pHistoryTask = NULL;
|
||||
if (pTask->historyTaskId.taskId != 0) {
|
||||
pHistoryTask = streamMetaAcquireTask(pMeta, pTask->historyTaskId.taskId);
|
||||
if (pHistoryTask == NULL) {
|
||||
tqError("vgId:%d failed to acquire fill-history task:0x%x, it may have been dropped already. Pause success",
|
||||
pMeta->vgId, pTask->historyTaskId.taskId);
|
||||
|
||||
streamMetaReleaseTask(pMeta, pTask);
|
||||
|
||||
// since task is in [STOP|DROPPING] state, it is safe to assume the pause is active
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
tqDebug("s-task:%s fill-history task handle paused along with related stream task", pHistoryTask->id.idStr);
|
||||
streamTaskPause(pHistoryTask);
|
||||
}
|
||||
return code;
|
||||
|
||||
streamMetaReleaseTask(pMeta, pTask);
|
||||
if (pHistoryTask != NULL) {
|
||||
streamMetaReleaseTask(pMeta, pHistoryTask);
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t tqProcessTaskResumeImpl(STQ* pTq, SStreamTask* pTask, int64_t sversion, int8_t igUntreated) {
|
||||
|
|
|
@ -80,11 +80,17 @@ int32_t tqStreamTasksStatusCheck(STQ* pTq) {
|
|||
continue;
|
||||
}
|
||||
|
||||
streamTaskCheckDownstreamTasks(pTask);
|
||||
if (pTask->info.fillHistory == 1) {
|
||||
tqDebug("s-task:%s fill-history task, wait for related stream task:0x%x to launch it", pTask->id.idStr,
|
||||
pTask->streamTaskId.taskId);
|
||||
continue;
|
||||
}
|
||||
|
||||
streamTaskDoCheckDownstreamTasks(pTask);
|
||||
streamMetaReleaseTask(pMeta, pTask);
|
||||
}
|
||||
taosArrayDestroy(pTaskList);
|
||||
|
||||
taosArrayDestroy(pTaskList);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
|
|
@ -664,9 +664,9 @@ int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo)
|
|||
case TDMT_STREAM_TRANSFER_STATE:
|
||||
return tqProcessTaskTransferStateReq(pVnode->pTq, pMsg);
|
||||
case TDMT_STREAM_SCAN_HISTORY_FINISH:
|
||||
return tqProcessStreamTaskScanHistoryFinishReq(pVnode->pTq, pMsg);
|
||||
return tqProcessTaskScanHistoryFinishReq(pVnode->pTq, pMsg);
|
||||
case TDMT_STREAM_SCAN_HISTORY_FINISH_RSP:
|
||||
return tqProcessTaskRecoverFinishRsp(pVnode->pTq, pMsg);
|
||||
return tqProcessTaskScanHistoryFinishRsp(pVnode->pTq, pMsg);
|
||||
default:
|
||||
vError("unknown msg type:%d in stream queue", pMsg->msgType);
|
||||
return TSDB_CODE_APP_ERROR;
|
||||
|
|
|
@ -122,7 +122,7 @@ void qResetStreamInfoTimeWindow(qTaskInfo_t tinfo) {
|
|||
return;
|
||||
}
|
||||
|
||||
qDebug("%s set fill history start key:%"PRId64, GET_TASKID(pTaskInfo), INT64_MIN);
|
||||
qDebug("%s set fill history start key:%" PRId64, GET_TASKID(pTaskInfo), INT64_MIN);
|
||||
pTaskInfo->streamInfo.fillHistoryWindow.skey = INT64_MIN;
|
||||
}
|
||||
|
||||
|
|
|
@ -31,6 +31,12 @@ typedef struct {
|
|||
void* timer;
|
||||
} SStreamGlobalEnv;
|
||||
|
||||
typedef struct {
|
||||
SEpSet epset;
|
||||
int32_t taskId;
|
||||
SRpcMsg msg;
|
||||
} SStreamContinueExecInfo;
|
||||
|
||||
extern SStreamGlobalEnv streamEnv;
|
||||
|
||||
void streamRetryDispatchStreamBlock(SStreamTask* pTask, int64_t waitDuration);
|
||||
|
@ -54,6 +60,9 @@ int32_t streamDoDispatchScanHistoryFinishMsg(SStreamTask* pTask, const SStreamSc
|
|||
|
||||
SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* pElem);
|
||||
|
||||
int32_t streamAddEndScanHistoryMsg(SStreamTask* pTask, SRpcHandleInfo* pRpcInfo, SStreamScanHistoryFinishReq* pReq);
|
||||
int32_t streamNotifyUpstreamContinue(SStreamTask* pTask);
|
||||
|
||||
extern int32_t streamBackendId;
|
||||
extern int32_t streamBackendCfWrapperId;
|
||||
|
||||
|
|
|
@ -216,15 +216,16 @@ int32_t streamTaskEnqueueRetrieve(SStreamTask* pTask, SStreamRetrieveReq* pReq,
|
|||
// todo add log
|
||||
int32_t streamTaskOutputResultBlock(SStreamTask* pTask, SStreamDataBlock* pBlock) {
|
||||
int32_t code = 0;
|
||||
if (pTask->outputType == TASK_OUTPUT__TABLE) {
|
||||
int32_t type = pTask->outputInfo.type;
|
||||
if (type == TASK_OUTPUT__TABLE) {
|
||||
pTask->tbSink.tbSinkFunc(pTask, pTask->tbSink.vnode, 0, pBlock->blocks);
|
||||
destroyStreamDataBlock(pBlock);
|
||||
} else if (pTask->outputType == TASK_OUTPUT__SMA) {
|
||||
} else if (type == TASK_OUTPUT__SMA) {
|
||||
pTask->smaSink.smaSink(pTask->smaSink.vnode, pTask->smaSink.smaId, pBlock->blocks);
|
||||
destroyStreamDataBlock(pBlock);
|
||||
} else {
|
||||
ASSERT(pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH);
|
||||
code = taosWriteQitem(pTask->outputQueue->queue, pBlock);
|
||||
ASSERT(type == TASK_OUTPUT__FIXED_DISPATCH || type == TASK_OUTPUT__SHUFFLE_DISPATCH);
|
||||
code = taosWriteQitem(pTask->outputInfo.queue->queue, pBlock);
|
||||
if (code != 0) { // todo failed to add it into the output queue, free it.
|
||||
return code;
|
||||
}
|
||||
|
@ -274,7 +275,7 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
|
|||
qDebug("s-task:%s receive dispatch rsp, output status:%d code:%d", pTask->id.idStr, pRsp->inputStatus, code);
|
||||
|
||||
// there are other dispatch message not response yet
|
||||
if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
||||
if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
||||
int32_t leftRsp = atomic_sub_fetch_32(&pTask->shuffleDispatcher.waitingRspCnt, 1);
|
||||
qDebug("s-task:%s is shuffle, left waiting rsp %d", pTask->id.idStr, leftRsp);
|
||||
if (leftRsp > 0) {
|
||||
|
@ -283,9 +284,9 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
|
|||
}
|
||||
|
||||
pTask->msgInfo.retryCount = 0;
|
||||
ASSERT(pTask->outputStatus == TASK_OUTPUT_STATUS__WAIT);
|
||||
ASSERT(pTask->outputInfo.status == TASK_OUTPUT_STATUS__WAIT);
|
||||
|
||||
qDebug("s-task:%s output status is set to:%d", pTask->id.idStr, pTask->outputStatus);
|
||||
qDebug("s-task:%s output status is set to:%d", pTask->id.idStr, pTask->outputInfo.status);
|
||||
|
||||
// the input queue of the (down stream) task that receive the output data is full,
|
||||
// so the TASK_INPUT_STATUS_BLOCKED is rsp
|
||||
|
@ -309,7 +310,7 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
|
|||
}
|
||||
|
||||
// now ready for next data output
|
||||
atomic_store_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL);
|
||||
atomic_store_8(&pTask->outputInfo.status, TASK_OUTPUT_STATUS__NORMAL);
|
||||
|
||||
// otherwise, continue dispatch the first block to down stream task in pipeline
|
||||
streamDispatchStreamBlock(pTask);
|
||||
|
@ -418,4 +419,16 @@ void* streamQueueNextItem(SStreamQueue* pQueue) {
|
|||
}
|
||||
}
|
||||
|
||||
void streamTaskInputFail(SStreamTask* pTask) { atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__FAILED); }
|
||||
void streamTaskInputFail(SStreamTask* pTask) { atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__FAILED); }
|
||||
|
||||
SStreamChildEpInfo * streamTaskGetUpstreamTaskEpInfo(SStreamTask* pTask, int32_t taskId) {
|
||||
int32_t num = taosArrayGetSize(pTask->pUpstreamEpInfoList);
|
||||
for(int32_t i = 0; i < num; ++i) {
|
||||
SStreamChildEpInfo* pInfo = taosArrayGetP(pTask->pUpstreamEpInfoList, i);
|
||||
if (pInfo->taskId == taskId) {
|
||||
return pInfo;
|
||||
}
|
||||
}
|
||||
|
||||
return NULL;
|
||||
}
|
|
@ -25,6 +25,12 @@ typedef struct SBlockName {
|
|||
char parTbName[TSDB_TABLE_NAME_LEN];
|
||||
} SBlockName;
|
||||
|
||||
static void initRpcMsg(SRpcMsg* pMsg, int32_t msgType, void* pCont, int32_t contLen) {
|
||||
pMsg->msgType = msgType;
|
||||
pMsg->pCont = pCont;
|
||||
pMsg->contLen = contLen;
|
||||
}
|
||||
|
||||
static int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const SStreamDispatchReq* pReq) {
|
||||
if (tStartEncode(pEncoder) < 0) return -1;
|
||||
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
|
||||
|
@ -311,13 +317,12 @@ int32_t streamDoDispatchScanHistoryFinishMsg(SStreamTask* pTask, const SStreamSc
|
|||
msg.contLen = tlen + sizeof(SMsgHead);
|
||||
msg.pCont = buf;
|
||||
msg.msgType = TDMT_STREAM_SCAN_HISTORY_FINISH;
|
||||
msg.info.noResp = 1;
|
||||
|
||||
tmsgSendReq(pEpSet, &msg);
|
||||
|
||||
const char* pStatus = streamGetTaskStatusStr(pTask->status.taskStatus);
|
||||
qDebug("s-task:%s status:%s dispatch scan-history-data finish msg to taskId:0x%x (vgId:%d)", pTask->id.idStr, pStatus,
|
||||
pReq->taskId, vgId);
|
||||
qDebug("s-task:%s status:%s dispatch scan-history finish msg to taskId:0x%x (vgId:%d)", pTask->id.idStr, pStatus,
|
||||
pReq->downstreamTaskId, vgId);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -437,7 +442,7 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat
|
|||
int32_t numOfBlocks = taosArrayGetSize(pData->blocks);
|
||||
ASSERT(numOfBlocks != 0);
|
||||
|
||||
if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) {
|
||||
if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
|
||||
SStreamDispatchReq req = {0};
|
||||
|
||||
int32_t downstreamTaskId = pTask->fixedEpDispatcher.taskId;
|
||||
|
@ -467,7 +472,7 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat
|
|||
taosArrayDestroyP(req.data, taosMemoryFree);
|
||||
taosArrayDestroy(req.dataLen);
|
||||
return code;
|
||||
} else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
||||
} else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
||||
int32_t rspCnt = atomic_load_32(&pTask->shuffleDispatcher.waitingRspCnt);
|
||||
ASSERT(rspCnt == 0);
|
||||
|
||||
|
@ -545,7 +550,7 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat
|
|||
|
||||
static void doRetryDispatchData(void* param, void* tmrId) {
|
||||
SStreamTask* pTask = param;
|
||||
ASSERT(pTask->outputStatus == TASK_OUTPUT_STATUS__WAIT);
|
||||
ASSERT(pTask->outputInfo.status == TASK_OUTPUT_STATUS__WAIT);
|
||||
|
||||
int32_t code = streamDispatchAllBlocks(pTask, pTask->msgInfo.pData);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
|
@ -561,29 +566,29 @@ void streamRetryDispatchStreamBlock(SStreamTask* pTask, int64_t waitDuration) {
|
|||
}
|
||||
|
||||
int32_t streamDispatchStreamBlock(SStreamTask* pTask) {
|
||||
ASSERT((pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH));
|
||||
STaskOutputInfo* pInfo = &pTask->outputInfo;
|
||||
ASSERT((pInfo->type == TASK_OUTPUT__FIXED_DISPATCH || pInfo->type == TASK_OUTPUT__SHUFFLE_DISPATCH));
|
||||
|
||||
int32_t numOfElems = taosQueueItemSize(pTask->outputQueue->queue);
|
||||
int32_t numOfElems = taosQueueItemSize(pInfo->queue->queue);
|
||||
if (numOfElems > 0) {
|
||||
qDebug("s-task:%s try to dispatch intermediate result block to downstream, elem in outputQ:%d", pTask->id.idStr,
|
||||
numOfElems);
|
||||
}
|
||||
|
||||
// to make sure only one dispatch is running
|
||||
int8_t old =
|
||||
atomic_val_compare_exchange_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL, TASK_OUTPUT_STATUS__WAIT);
|
||||
int8_t old = atomic_val_compare_exchange_8(&pInfo->status, TASK_OUTPUT_STATUS__NORMAL, TASK_OUTPUT_STATUS__WAIT);
|
||||
if (old != TASK_OUTPUT_STATUS__NORMAL) {
|
||||
qDebug("s-task:%s wait for dispatch rsp, not dispatch now, output status:%d", pTask->id.idStr, old);
|
||||
return 0;
|
||||
}
|
||||
|
||||
ASSERT(pTask->msgInfo.pData == NULL);
|
||||
qDebug("s-task:%s start to dispatch msg, set output status:%d", pTask->id.idStr, pTask->outputStatus);
|
||||
qDebug("s-task:%s start to dispatch msg, set output status:%d", pTask->id.idStr, pInfo->status);
|
||||
|
||||
SStreamDataBlock* pBlock = streamQueueNextItem(pTask->outputQueue);
|
||||
SStreamDataBlock* pBlock = streamQueueNextItem(pInfo->queue);
|
||||
if (pBlock == NULL) {
|
||||
atomic_store_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL);
|
||||
qDebug("s-task:%s not dispatch since no elems in outputQ, output status:%d", pTask->id.idStr, pTask->outputStatus);
|
||||
atomic_store_8(&pInfo->status, TASK_OUTPUT_STATUS__NORMAL);
|
||||
qDebug("s-task:%s not dispatch since no elems in outputQ, output status:%d", pTask->id.idStr, pInfo->status);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -599,19 +604,19 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) {
|
|||
}
|
||||
|
||||
qDebug("s-task:%s failed to dispatch msg to downstream, code:%s, output status:%d, retry cnt:%d", pTask->id.idStr,
|
||||
tstrerror(terrno), pTask->outputStatus, retryCount);
|
||||
tstrerror(terrno), pInfo->status, retryCount);
|
||||
|
||||
// todo deal with only partially success dispatch case
|
||||
atomic_store_32(&pTask->shuffleDispatcher.waitingRspCnt, 0);
|
||||
if (terrno == TSDB_CODE_APP_IS_STOPPING) { // in case of this error, do not retry anymore
|
||||
if (terrno == TSDB_CODE_APP_IS_STOPPING) { // in case of this error, do not retry anymore
|
||||
destroyStreamDataBlock(pTask->msgInfo.pData);
|
||||
pTask->msgInfo.pData = NULL;
|
||||
return code;
|
||||
}
|
||||
|
||||
if (++retryCount > MAX_CONTINUE_RETRY_COUNT) { // add to timer to retry
|
||||
qDebug("s-task:%s failed to dispatch msg to downstream for %d times, code:%s, add timer to retry in %dms", pTask->id.idStr,
|
||||
retryCount, tstrerror(terrno), DISPATCH_RETRY_INTERVAL_MS);
|
||||
if (++retryCount > MAX_CONTINUE_RETRY_COUNT) { // add to timer to retry
|
||||
qDebug("s-task:%s failed to dispatch msg to downstream for %d times, code:%s, add timer to retry in %dms",
|
||||
pTask->id.idStr, retryCount, tstrerror(terrno), DISPATCH_RETRY_INTERVAL_MS);
|
||||
streamRetryDispatchStreamBlock(pTask, DISPATCH_RETRY_INTERVAL_MS);
|
||||
break;
|
||||
}
|
||||
|
@ -620,3 +625,93 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) {
|
|||
// this block can not be deleted until it has been sent to downstream task successfully.
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t tEncodeCompleteHistoryDataMsg(SEncoder* pEncoder, const SStreamCompleteHistoryMsg* pReq) {
|
||||
if (tStartEncode(pEncoder) < 0) return -1;
|
||||
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
|
||||
if (tEncodeI32(pEncoder, pReq->downstreamId) < 0) return -1;
|
||||
if (tEncodeI32(pEncoder, pReq->downstreamNode) < 0) return -1;
|
||||
if (tEncodeI32(pEncoder, pReq->upstreamTaskId) < 0) return -1;
|
||||
if (tEncodeI32(pEncoder, pReq->upstreamNodeId) < 0) return -1;
|
||||
tEndEncode(pEncoder);
|
||||
return pEncoder->pos;
|
||||
}
|
||||
|
||||
int32_t tDecodeCompleteHistoryDataMsg(SDecoder* pDecoder, SStreamCompleteHistoryMsg* pRsp) {
|
||||
if (tStartDecode(pDecoder) < 0) return -1;
|
||||
if (tDecodeI64(pDecoder, &pRsp->streamId) < 0) return -1;
|
||||
if (tDecodeI32(pDecoder, &pRsp->downstreamId) < 0) return -1;
|
||||
if (tDecodeI32(pDecoder, &pRsp->downstreamNode) < 0) return -1;
|
||||
if (tDecodeI32(pDecoder, &pRsp->upstreamTaskId) < 0) return -1;
|
||||
if (tDecodeI32(pDecoder, &pRsp->upstreamNodeId) < 0) return -1;
|
||||
tEndDecode(pDecoder);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t streamAddEndScanHistoryMsg(SStreamTask* pTask, SRpcHandleInfo* pRpcInfo, SStreamScanHistoryFinishReq* pReq) {
|
||||
int32_t len = 0;
|
||||
int32_t code = 0;
|
||||
SEncoder encoder;
|
||||
|
||||
SStreamCompleteHistoryMsg msg = {
|
||||
.streamId = pReq->streamId,
|
||||
.upstreamTaskId = pReq->upstreamTaskId,
|
||||
.upstreamNodeId = pReq->upstreamNodeId,
|
||||
.downstreamId = pReq->downstreamTaskId,
|
||||
.downstreamNode = pTask->pMeta->vgId,
|
||||
};
|
||||
|
||||
tEncodeSize(tEncodeCompleteHistoryDataMsg, &msg, len, code);
|
||||
if (code < 0) {
|
||||
return code;
|
||||
}
|
||||
|
||||
void* pBuf = rpcMallocCont(sizeof(SMsgHead) + len);
|
||||
if (pBuf == NULL) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
((SMsgHead*)pBuf)->vgId = htonl(pReq->upstreamNodeId);
|
||||
|
||||
void* abuf = POINTER_SHIFT(pBuf, sizeof(SMsgHead));
|
||||
|
||||
tEncoderInit(&encoder, (uint8_t*)abuf, len);
|
||||
tEncodeCompleteHistoryDataMsg(&encoder, &msg);
|
||||
tEncoderClear(&encoder);
|
||||
|
||||
SStreamChildEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, pReq->upstreamTaskId);
|
||||
|
||||
SStreamContinueExecInfo info = {.taskId = pReq->upstreamTaskId, .epset = pInfo->epSet};
|
||||
initRpcMsg(&info.msg, 0, pBuf, sizeof(SMsgHead) + len);
|
||||
info.msg.info = *pRpcInfo;
|
||||
|
||||
taosThreadMutexLock(&pTask->lock);
|
||||
if (pTask->pRspMsgList == NULL) {
|
||||
pTask->pRspMsgList = taosArrayInit(4, sizeof(SStreamContinueExecInfo));
|
||||
}
|
||||
taosArrayPush(pTask->pRspMsgList, &info);
|
||||
taosThreadMutexUnlock(&pTask->lock);
|
||||
|
||||
int32_t num = taosArrayGetSize(pTask->pRspMsgList);
|
||||
qDebug("s-task:%s add scan history finish rsp msg for task:0x%x, total:%d", pTask->id.idStr, pReq->upstreamTaskId,
|
||||
num);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t streamNotifyUpstreamContinue(SStreamTask* pTask) {
|
||||
ASSERT(pTask->info.taskLevel == TASK_LEVEL__AGG || pTask->info.taskLevel == TASK_LEVEL__SINK);
|
||||
|
||||
int32_t num = taosArrayGetSize(pTask->pRspMsgList);
|
||||
for (int32_t i = 0; i < num; ++i) {
|
||||
SStreamContinueExecInfo* pInfo = taosArrayGet(pTask->pRspMsgList, i);
|
||||
tmsgSendRsp(&pInfo->msg);
|
||||
|
||||
qDebug("s-task:%s level:%d notify upstream:0x%x to continue process data from WAL", pTask->id.idStr, pTask->info.taskLevel,
|
||||
pInfo->taskId);
|
||||
}
|
||||
|
||||
taosArrayClear(pTask->pRspMsgList);
|
||||
qDebug("s-task:%s level:%d checkpoint ready msg sent to all %d upstreams", pTask->id.idStr, pTask->info.taskLevel,
|
||||
num);
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -351,30 +351,40 @@ static void waitForTaskIdle(SStreamTask* pTask, SStreamTask* pStreamTask) {
|
|||
}
|
||||
|
||||
static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) {
|
||||
SStreamTask* pStreamTask = streamMetaAcquireTask(pTask->pMeta, pTask->streamTaskId.taskId);
|
||||
SStreamMeta* pMeta = pTask->pMeta;
|
||||
|
||||
SStreamTask* pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.taskId);
|
||||
if (pStreamTask == NULL) {
|
||||
qError("s-task:%s failed to find related stream task:0x%x, it may have been destroyed or closed",
|
||||
pTask->id.idStr, pTask->streamTaskId.taskId);
|
||||
pTask->status.transferState = false; // reset this value, to avoid transfer state again
|
||||
|
||||
qError("s-task:%s failed to find related stream task:0x%x, it may have been destroyed or closed", pTask->id.idStr,
|
||||
pTask->streamTaskId.taskId);
|
||||
return TSDB_CODE_STREAM_TASK_NOT_EXIST;
|
||||
} else {
|
||||
qDebug("s-task:%s fill-history task end, update related stream task:%s info, transfer exec state", pTask->id.idStr,
|
||||
pStreamTask->id.idStr);
|
||||
}
|
||||
|
||||
ASSERT(pStreamTask != NULL && pStreamTask->historyTaskId.taskId == pTask->id.taskId);
|
||||
// todo fix race condition
|
||||
streamTaskDisablePause(pTask);
|
||||
streamTaskDisablePause(pStreamTask);
|
||||
|
||||
ASSERT(pStreamTask->historyTaskId.taskId == pTask->id.taskId && pTask->status.transferState == true);
|
||||
|
||||
STimeWindow* pTimeWindow = &pStreamTask->dataRange.window;
|
||||
|
||||
// It must be halted for a source stream task, since when the related scan-history-data task start scan the history
|
||||
// for the step 2. For a agg task
|
||||
int8_t status = pStreamTask->status.taskStatus;
|
||||
if (pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) {
|
||||
ASSERT(pStreamTask->status.taskStatus == TASK_STATUS__HALT);
|
||||
ASSERT(status == TASK_STATUS__HALT);
|
||||
} else {
|
||||
ASSERT(pStreamTask->status.taskStatus == TASK_STATUS__NORMAL);
|
||||
ASSERT(status == TASK_STATUS__SCAN_HISTORY);
|
||||
pStreamTask->status.taskStatus = TASK_STATUS__HALT;
|
||||
qDebug("s-task:%s status: halt by related fill history task:%s", pStreamTask->id.idStr, pTask->id.idStr);
|
||||
qDebug("s-task:%s halt by related fill history task:%s", pStreamTask->id.idStr, pTask->id.idStr);
|
||||
}
|
||||
|
||||
// wait for the stream task to be idle
|
||||
// wait for the stream task to handle all in the inputQ, and to be idle
|
||||
waitForTaskIdle(pTask, pStreamTask);
|
||||
|
||||
// In case of sink tasks, no need to be halted for them.
|
||||
|
@ -399,10 +409,27 @@ static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) {
|
|||
streamTaskReleaseState(pTask);
|
||||
streamTaskReloadState(pStreamTask);
|
||||
|
||||
// reset the status of stream task
|
||||
streamSetStatusNormal(pStreamTask);
|
||||
|
||||
pTask->status.taskStatus = TASK_STATUS__DROPPING;
|
||||
qDebug("s-task:%s fill-history task set status to be dropping, save the state into disk", pTask->id.idStr);
|
||||
|
||||
// save to disk
|
||||
taosWLockLatch(&pMeta->lock);
|
||||
streamMetaSaveTask(pMeta, pTask);
|
||||
streamMetaSaveTask(pMeta, pStreamTask);
|
||||
if (streamMetaCommit(pMeta) < 0) {
|
||||
// persist to disk
|
||||
}
|
||||
taosWUnLockLatch(&pMeta->lock);
|
||||
|
||||
// pause allowed
|
||||
streamTaskEnablePause(pStreamTask);
|
||||
streamTaskEnablePause(pTask);
|
||||
|
||||
streamSchedExec(pStreamTask);
|
||||
streamMetaReleaseTask(pTask->pMeta, pStreamTask);
|
||||
streamMetaReleaseTask(pMeta, pStreamTask);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -480,7 +507,6 @@ int32_t streamExecForAll(SStreamTask* pTask) {
|
|||
ASSERT(batchSize == 0);
|
||||
if (pTask->info.fillHistory && pTask->status.transferState) {
|
||||
int32_t code = streamTransferStateToStreamTask(pTask);
|
||||
pTask->status.transferState = false; // reset this value, to avoid transfer state again
|
||||
if (code != TSDB_CODE_SUCCESS) { // todo handle this
|
||||
return 0;
|
||||
}
|
||||
|
@ -550,22 +576,7 @@ int32_t streamExecForAll(SStreamTask* pTask) {
|
|||
}
|
||||
|
||||
bool streamTaskIsIdle(const SStreamTask* pTask) {
|
||||
int32_t numOfItems = taosQueueItemSize(pTask->inputQueue->queue);
|
||||
if (numOfItems > 0) {
|
||||
return false;
|
||||
}
|
||||
|
||||
numOfItems = taosQallItemSize(pTask->inputQueue->qall);
|
||||
if (numOfItems > 0) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// blocked by downstream task
|
||||
if (pTask->outputStatus == TASK_OUTPUT_STATUS__BLOCKED) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return (pTask->status.schedStatus == TASK_SCHED_STATUS__INACTIVE);
|
||||
return (pTask->status.schedStatus == TASK_SCHED_STATUS__INACTIVE/* && pTask->status.taskStatus != TASK_STATUS__HALT*/);
|
||||
}
|
||||
|
||||
int32_t streamTryExec(SStreamTask* pTask) {
|
||||
|
|
|
@ -266,7 +266,7 @@ SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int32_t taskId) {
|
|||
if (!streamTaskShouldStop(&(*ppTask)->status)) {
|
||||
int32_t ref = atomic_add_fetch_32(&(*ppTask)->refCnt, 1);
|
||||
taosRUnLockLatch(&pMeta->lock);
|
||||
qDebug("s-task:%s acquire task, ref:%d", (*ppTask)->id.idStr, ref);
|
||||
qTrace("s-task:%s acquire task, ref:%d", (*ppTask)->id.idStr, ref);
|
||||
return *ppTask;
|
||||
}
|
||||
}
|
||||
|
@ -278,7 +278,7 @@ SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int32_t taskId) {
|
|||
void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask) {
|
||||
int32_t ref = atomic_sub_fetch_32(&pTask->refCnt, 1);
|
||||
if (ref > 0) {
|
||||
qDebug("s-task:%s release task, ref:%d", pTask->id.idStr, ref);
|
||||
qTrace("s-task:%s release task, ref:%d", pTask->id.idStr, ref);
|
||||
} else if (ref == 0) {
|
||||
ASSERT(streamTaskShouldStop(&pTask->status));
|
||||
tFreeStreamTask(pTask);
|
||||
|
|
|
@ -17,6 +17,18 @@
|
|||
#include "ttimer.h"
|
||||
#include "wal.h"
|
||||
|
||||
static void launchFillHistoryTask(SStreamTask* pTask);
|
||||
static void streamTaskSetRangeStreamCalc(SStreamTask* pTask);
|
||||
|
||||
static void streamTaskSetForReady(SStreamTask* pTask, int32_t numOfReqs) {
|
||||
ASSERT(pTask->status.downstreamReady == 0);
|
||||
pTask->status.downstreamReady = 1;
|
||||
int64_t el = (taosGetTimestampMs() - pTask->initTs);
|
||||
|
||||
qDebug("s-task:%s all %d downstream ready, init completed, elapsed time:%dms, task status:%s",
|
||||
pTask->id.idStr, numOfReqs, (int32_t) el, streamGetTaskStatusStr(pTask->status.taskStatus));
|
||||
}
|
||||
|
||||
int32_t streamStartRecoverTask(SStreamTask* pTask, int8_t igUntreated) {
|
||||
SStreamScanHistoryReq req;
|
||||
streamBuildSourceRecover1Req(pTask, &req, igUntreated);
|
||||
|
@ -50,10 +62,6 @@ const char* streamGetTaskStatusStr(int32_t status) {
|
|||
|
||||
static int32_t doLaunchScanHistoryTask(SStreamTask* pTask) {
|
||||
SVersionRange* pRange = &pTask->dataRange.range;
|
||||
|
||||
qDebug("s-task:%s vgId:%d status:%s, start scan-history-data task, verRange:%" PRId64 " - %" PRId64, pTask->id.idStr,
|
||||
pTask->info.nodeId, streamGetTaskStatusStr(pTask->status.taskStatus), pRange->minVer, pRange->maxVer);
|
||||
|
||||
streamSetParamForScanHistory(pTask);
|
||||
streamSetParamForStreamScannerStep1(pTask, pRange, &pTask->dataRange.window);
|
||||
|
||||
|
@ -72,19 +80,17 @@ int32_t streamTaskLaunchScanHistory(SStreamTask* pTask) {
|
|||
walReaderGetCurrentVer(pTask->exec.pWalReader));
|
||||
}
|
||||
} else if (pTask->info.taskLevel == TASK_LEVEL__AGG) {
|
||||
streamSetStatusNormal(pTask);
|
||||
streamSetParamForScanHistory(pTask);
|
||||
streamAggScanHistoryPrepare(pTask);
|
||||
streamTaskScanHistoryPrepare(pTask);
|
||||
} else if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
|
||||
streamSetStatusNormal(pTask);
|
||||
qDebug("s-task:%s sink task convert to normal immediately", pTask->id.idStr);
|
||||
qDebug("s-task:%s sink task do nothing to handle scan-history", pTask->id.idStr);
|
||||
streamTaskScanHistoryPrepare(pTask);
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
// check status
|
||||
int32_t streamTaskCheckDownstreamTasks(SStreamTask* pTask) {
|
||||
int32_t streamTaskDoCheckDownstreamTasks(SStreamTask* pTask) {
|
||||
SHistDataRange* pRange = &pTask->dataRange;
|
||||
STimeWindow* pWindow = &pRange->window;
|
||||
|
||||
|
@ -96,7 +102,7 @@ int32_t streamTaskCheckDownstreamTasks(SStreamTask* pTask) {
|
|||
};
|
||||
|
||||
// serialize
|
||||
if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) {
|
||||
if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
|
||||
req.reqId = tGenIdPI64();
|
||||
req.downstreamNodeId = pTask->fixedEpDispatcher.nodeId;
|
||||
req.downstreamTaskId = pTask->fixedEpDispatcher.taskId;
|
||||
|
@ -108,7 +114,7 @@ int32_t streamTaskCheckDownstreamTasks(SStreamTask* pTask) {
|
|||
pWindow->skey, pWindow->ekey, req.reqId);
|
||||
|
||||
streamDispatchCheckMsg(pTask, &req, pTask->fixedEpDispatcher.nodeId, &pTask->fixedEpDispatcher.epSet);
|
||||
} else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
||||
} else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
||||
SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
|
||||
|
||||
int32_t numOfVgs = taosArrayGetSize(vgInfo);
|
||||
|
@ -129,11 +135,18 @@ int32_t streamTaskCheckDownstreamTasks(SStreamTask* pTask) {
|
|||
streamDispatchCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet);
|
||||
}
|
||||
} else {
|
||||
pTask->status.downstreamReady = 1;
|
||||
qDebug("s-task:%s (vgId:%d) no downstream tasks, set downstream checked, try to launch scan-history-data, status:%s",
|
||||
pTask->id.idStr, pTask->info.nodeId, streamGetTaskStatusStr(pTask->status.taskStatus));
|
||||
qDebug("s-task:%s (vgId:%d) set downstream ready, since no downstream", pTask->id.idStr, pTask->info.nodeId);
|
||||
|
||||
streamTaskSetForReady(pTask, 0);
|
||||
streamTaskSetRangeStreamCalc(pTask);
|
||||
streamTaskLaunchScanHistory(pTask);
|
||||
|
||||
// enable pause when init completed.
|
||||
if (pTask->historyTaskId.taskId == 0) {
|
||||
streamTaskEnablePause(pTask);
|
||||
}
|
||||
|
||||
launchFillHistoryTask(pTask);
|
||||
}
|
||||
|
||||
return 0;
|
||||
|
@ -153,9 +166,9 @@ int32_t streamRecheckDownstream(SStreamTask* pTask, const SStreamTaskCheckRsp* p
|
|||
qDebug("s-task:%s (vgId:%d) check downstream task:0x%x (vgId:%d) (recheck)", pTask->id.idStr, pTask->info.nodeId,
|
||||
req.downstreamTaskId, req.downstreamNodeId);
|
||||
|
||||
if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) {
|
||||
if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
|
||||
streamDispatchCheckMsg(pTask, &req, pRsp->downstreamNodeId, &pTask->fixedEpDispatcher.epSet);
|
||||
} else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
||||
} else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
||||
SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
|
||||
|
||||
int32_t numOfVgs = taosArrayGetSize(vgInfo);
|
||||
|
@ -171,7 +184,28 @@ int32_t streamRecheckDownstream(SStreamTask* pTask, const SStreamTaskCheckRsp* p
|
|||
}
|
||||
|
||||
int32_t streamTaskCheckStatus(SStreamTask* pTask) {
|
||||
return atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__NORMAL? 1:0;
|
||||
return (pTask->status.downstreamReady == 1)? 1:0;
|
||||
}
|
||||
|
||||
static void doProcessDownstreamReadyRsp(SStreamTask* pTask, int32_t numOfReqs) {
|
||||
streamTaskSetForReady(pTask, numOfReqs);
|
||||
const char* id = pTask->id.idStr;
|
||||
|
||||
int8_t status = pTask->status.taskStatus;
|
||||
const char* str = streamGetTaskStatusStr(status);
|
||||
|
||||
ASSERT(status == TASK_STATUS__SCAN_HISTORY || status == TASK_STATUS__NORMAL);
|
||||
streamTaskSetRangeStreamCalc(pTask);
|
||||
|
||||
if (status == TASK_STATUS__SCAN_HISTORY) {
|
||||
qDebug("s-task:%s enter into scan-history data stage, status:%s", id, str);
|
||||
streamTaskLaunchScanHistory(pTask);
|
||||
} else {
|
||||
qDebug("s-task:%s downstream tasks are ready, now ready for data from wal, status:%s", id, str);
|
||||
}
|
||||
|
||||
// when current stream task is ready, check the related fill history task.
|
||||
launchFillHistoryTask(pTask);
|
||||
}
|
||||
|
||||
int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp) {
|
||||
|
@ -179,7 +213,7 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs
|
|||
const char* id = pTask->id.idStr;
|
||||
|
||||
if (pRsp->status == 1) {
|
||||
if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
||||
if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
||||
bool found = false;
|
||||
|
||||
int32_t numOfReqs = taosArrayGetSize(pTask->checkReqIds);
|
||||
|
@ -201,41 +235,20 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs
|
|||
if (left == 0) {
|
||||
taosArrayDestroy(pTask->checkReqIds);
|
||||
pTask->checkReqIds = NULL;
|
||||
pTask->status.downstreamReady = 1;
|
||||
|
||||
if (pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) {
|
||||
qDebug("s-task:%s all %d downstream tasks are ready, now enter into scan-history-data stage, status:%s", id,
|
||||
numOfReqs, streamGetTaskStatusStr(pTask->status.taskStatus));
|
||||
streamTaskLaunchScanHistory(pTask);
|
||||
} else {
|
||||
ASSERT(pTask->status.taskStatus == TASK_STATUS__NORMAL);
|
||||
qDebug("s-task:%s fixed downstream task is ready, now ready for data from wal, status:%s", id,
|
||||
streamGetTaskStatusStr(pTask->status.taskStatus));
|
||||
}
|
||||
doProcessDownstreamReadyRsp(pTask, numOfReqs);
|
||||
} else {
|
||||
int32_t total = taosArrayGetSize(pTask->shuffleDispatcher.dbInfo.pVgroupInfos);
|
||||
qDebug("s-task:%s (vgId:%d) recv check rsp from task:0x%x (vgId:%d) status:%d, total:%d not ready:%d", id,
|
||||
pRsp->upstreamNodeId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->status, total, left);
|
||||
}
|
||||
} else {
|
||||
ASSERT(pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH);
|
||||
ASSERT(pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH);
|
||||
if (pRsp->reqId != pTask->checkReqId) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
// set the downstream tasks have been checked flag
|
||||
ASSERT(pTask->status.downstreamReady == 0);
|
||||
pTask->status.downstreamReady = 1;
|
||||
|
||||
ASSERT(pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY || pTask->status.taskStatus == TASK_STATUS__NORMAL);
|
||||
if (pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) {
|
||||
qDebug("s-task:%s fixed downstream task is ready, now enter into scan-history-data stage, status:%s", id,
|
||||
streamGetTaskStatusStr(pTask->status.taskStatus));
|
||||
streamTaskLaunchScanHistory(pTask);
|
||||
} else {
|
||||
qDebug("s-task:%s fixed downstream task is ready, ready for data from inputQ, status:%s", id,
|
||||
streamGetTaskStatusStr(pTask->status.taskStatus));
|
||||
}
|
||||
doProcessDownstreamReadyRsp(pTask, 1);
|
||||
}
|
||||
} else { // not ready, wait for 100ms and retry
|
||||
qDebug("s-task:%s downstream taskId:0x%x (vgId:%d) not ready, wait for 100ms and retry", id, pRsp->downstreamTaskId,
|
||||
|
@ -248,6 +261,32 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t streamSendCheckRsp(const SStreamMeta* pMeta, const SStreamTaskCheckReq* pReq, SStreamTaskCheckRsp* pRsp,
|
||||
SRpcHandleInfo *pRpcInfo, int32_t taskId) {
|
||||
SEncoder encoder;
|
||||
int32_t code;
|
||||
int32_t len;
|
||||
|
||||
tEncodeSize(tEncodeStreamTaskCheckRsp, pRsp, len, code);
|
||||
if (code < 0) {
|
||||
qError("vgId:%d failed to encode task check rsp, s-task:0x%x", pMeta->vgId, taskId);
|
||||
return -1;
|
||||
}
|
||||
|
||||
void* buf = rpcMallocCont(sizeof(SMsgHead) + len);
|
||||
((SMsgHead*)buf)->vgId = htonl(pReq->upstreamNodeId);
|
||||
|
||||
void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
|
||||
tEncoderInit(&encoder, (uint8_t*)abuf, len);
|
||||
tEncodeStreamTaskCheckRsp(&encoder, pRsp);
|
||||
tEncoderClear(&encoder);
|
||||
|
||||
SRpcMsg rspMsg = {.code = 0, .pCont = buf, .contLen = sizeof(SMsgHead) + len, .info = *pRpcInfo};
|
||||
|
||||
tmsgSendRsp(&rspMsg);
|
||||
return 0;
|
||||
}
|
||||
|
||||
// common
|
||||
int32_t streamSetParamForScanHistory(SStreamTask* pTask) {
|
||||
qDebug("s-task:%s set operator option for scan-history-data", pTask->id.idStr);
|
||||
|
@ -255,7 +294,7 @@ int32_t streamSetParamForScanHistory(SStreamTask* pTask) {
|
|||
}
|
||||
|
||||
int32_t streamRestoreParam(SStreamTask* pTask) {
|
||||
qDebug("s-task:%s restore operator param after scan-history-data", pTask->id.idStr);
|
||||
qDebug("s-task:%s restore operator param after scan-history", pTask->id.idStr);
|
||||
return qRestoreStreamOperatorOption(pTask->exec.pExecutor);
|
||||
}
|
||||
|
||||
|
@ -293,23 +332,33 @@ int32_t streamSourceScanHistoryData(SStreamTask* pTask) {
|
|||
}
|
||||
|
||||
int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask) {
|
||||
SStreamScanHistoryFinishReq req = { .streamId = pTask->id.streamId, .childId = pTask->info.selfChildId };
|
||||
SStreamScanHistoryFinishReq req = {
|
||||
.streamId = pTask->id.streamId,
|
||||
.childId = pTask->info.selfChildId,
|
||||
.upstreamTaskId = pTask->id.taskId,
|
||||
.upstreamNodeId = pTask->pMeta->vgId,
|
||||
};
|
||||
|
||||
// serialize
|
||||
if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) {
|
||||
req.taskId = pTask->fixedEpDispatcher.taskId;
|
||||
if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
|
||||
req.downstreamTaskId = pTask->fixedEpDispatcher.taskId;
|
||||
pTask->notReadyTasks = 1;
|
||||
streamDoDispatchScanHistoryFinishMsg(pTask, &req, pTask->fixedEpDispatcher.nodeId, &pTask->fixedEpDispatcher.epSet);
|
||||
} else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
||||
} else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
||||
SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
|
||||
int32_t numOfVgs = taosArrayGetSize(vgInfo);
|
||||
pTask->notReadyTasks = numOfVgs;
|
||||
|
||||
qDebug("s-task:%s send scan-history-data complete msg to downstream (shuffle-dispatch) %d tasks, status:%s", pTask->id.idStr,
|
||||
numOfVgs, streamGetTaskStatusStr(pTask->status.taskStatus));
|
||||
for (int32_t i = 0; i < numOfVgs; i++) {
|
||||
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
|
||||
req.taskId = pVgInfo->taskId;
|
||||
req.downstreamTaskId = pVgInfo->taskId;
|
||||
streamDoDispatchScanHistoryFinishMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet);
|
||||
}
|
||||
} else {
|
||||
qDebug("s-task:%s no downstream tasks, invoke history finish rsp directly", pTask->id.idStr);
|
||||
streamProcessScanHistoryFinishRsp(pTask);
|
||||
}
|
||||
|
||||
return 0;
|
||||
|
@ -353,7 +402,7 @@ static int32_t doDispatchTransferMsg(SStreamTask* pTask, const SStreamTransferRe
|
|||
|
||||
tmsgSendReq(pEpSet, &msg);
|
||||
qDebug("s-task:%s level:%d, status:%s dispatch transfer state msg to taskId:0x%x (vgId:%d)", pTask->id.idStr,
|
||||
pTask->info.taskLevel, streamGetTaskStatusStr(pTask->status.taskStatus), pReq->taskId, vgId);
|
||||
pTask->info.taskLevel, streamGetTaskStatusStr(pTask->status.taskStatus), pReq->downstreamTaskId, vgId);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
@ -362,16 +411,16 @@ int32_t streamDispatchTransferStateMsg(SStreamTask* pTask) {
|
|||
SStreamTransferReq req = { .streamId = pTask->id.streamId, .childId = pTask->info.selfChildId };
|
||||
|
||||
// serialize
|
||||
if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) {
|
||||
req.taskId = pTask->fixedEpDispatcher.taskId;
|
||||
if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
|
||||
req.downstreamTaskId = pTask->fixedEpDispatcher.taskId;
|
||||
doDispatchTransferMsg(pTask, &req, pTask->fixedEpDispatcher.nodeId, &pTask->fixedEpDispatcher.epSet);
|
||||
} else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
||||
} else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
||||
SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
|
||||
|
||||
int32_t numOfVgs = taosArrayGetSize(vgInfo);
|
||||
for (int32_t i = 0; i < numOfVgs; i++) {
|
||||
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
|
||||
req.taskId = pVgInfo->taskId;
|
||||
req.downstreamTaskId = pVgInfo->taskId;
|
||||
doDispatchTransferMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet);
|
||||
}
|
||||
}
|
||||
|
@ -380,10 +429,11 @@ int32_t streamDispatchTransferStateMsg(SStreamTask* pTask) {
|
|||
}
|
||||
|
||||
// agg
|
||||
int32_t streamAggScanHistoryPrepare(SStreamTask* pTask) {
|
||||
int32_t streamTaskScanHistoryPrepare(SStreamTask* pTask) {
|
||||
pTask->numOfWaitingUpstream = taosArrayGetSize(pTask->pUpstreamEpInfoList);
|
||||
qDebug("s-task:%s agg task is ready and wait for %d upstream tasks complete scan-history procedure", pTask->id.idStr,
|
||||
pTask->numOfWaitingUpstream);
|
||||
qDebug("s-task:%s level:%d task wait for %d upstream tasks complete scan-history procedure, status:%s",
|
||||
pTask->id.idStr, pTask->info.taskLevel, pTask->numOfWaitingUpstream,
|
||||
streamGetTaskStatusStr(pTask->status.taskStatus));
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -399,27 +449,63 @@ int32_t streamAggUpstreamScanHistoryFinish(SStreamTask* pTask) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, int32_t taskId, int32_t childId) {
|
||||
if (pTask->info.taskLevel == TASK_LEVEL__AGG) {
|
||||
int32_t left = atomic_sub_fetch_32(&pTask->numOfWaitingUpstream, 1);
|
||||
ASSERT(left >= 0);
|
||||
int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, SStreamScanHistoryFinishReq* pReq,
|
||||
SRpcHandleInfo* pRpcInfo) {
|
||||
int32_t taskLevel = pTask->info.taskLevel;
|
||||
ASSERT(taskLevel == TASK_LEVEL__AGG || taskLevel == TASK_LEVEL__SINK);
|
||||
|
||||
if (left == 0) {
|
||||
int32_t numOfTasks = taosArrayGetSize(pTask->pUpstreamEpInfoList);
|
||||
qDebug("s-task:%s all %d upstream tasks finish scan-history data, set param for agg task for stream data",
|
||||
pTask->id.idStr, numOfTasks);
|
||||
// sink node do not send end of scan history msg to its upstream, which is agg task.
|
||||
streamAddEndScanHistoryMsg(pTask, pRpcInfo, pReq);
|
||||
|
||||
int32_t left = atomic_sub_fetch_32(&pTask->numOfWaitingUpstream, 1);
|
||||
ASSERT(left >= 0);
|
||||
|
||||
if (left == 0) {
|
||||
int32_t numOfTasks = taosArrayGetSize(pTask->pUpstreamEpInfoList);
|
||||
qDebug(
|
||||
"s-task:%s all %d upstream tasks finish scan-history data, set param for agg task for stream data and send "
|
||||
"rsp to all upstream tasks",
|
||||
pTask->id.idStr, numOfTasks);
|
||||
|
||||
if (pTask->info.taskLevel == TASK_LEVEL__AGG) {
|
||||
streamAggUpstreamScanHistoryFinish(pTask);
|
||||
} else {
|
||||
qDebug("s-task:%s receive scan-history data finish msg from upstream:0x%x(index:%d), unfinished:%d",
|
||||
pTask->id.idStr, taskId, childId, left);
|
||||
}
|
||||
|
||||
streamNotifyUpstreamContinue(pTask);
|
||||
|
||||
// sink node does not receive the pause msg from mnode
|
||||
if (pTask->info.taskLevel == TASK_LEVEL__AGG) {
|
||||
streamTaskEnablePause(pTask);
|
||||
}
|
||||
} else {
|
||||
qDebug("s-task:%s receive scan-history data finish msg from upstream:0x%x(index:%d), unfinished:%d",
|
||||
pTask->id.idStr, pReq->upstreamTaskId, pReq->childId, left);
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t streamProcessScanHistoryFinishRsp(SStreamTask* pTask) {
|
||||
ASSERT(pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY);
|
||||
SStreamMeta* pMeta = pTask->pMeta;
|
||||
|
||||
// execute in the scan history complete call back msg, ready to process data from inputQ
|
||||
streamSetStatusNormal(pTask);
|
||||
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
|
||||
|
||||
taosWLockLatch(&pMeta->lock);
|
||||
streamMetaSaveTask(pMeta, pTask);
|
||||
taosWUnLockLatch(&pMeta->lock);
|
||||
|
||||
streamTaskEnablePause(pTask);
|
||||
|
||||
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
|
||||
streamSchedExec(pTask);
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static void doCheckDownstreamStatus(SStreamTask* pTask, SStreamTask* pHTask) {
|
||||
pHTask->dataRange.range.minVer = 0;
|
||||
pHTask->dataRange.range.maxVer = pTask->chkInfo.currentVer;
|
||||
|
@ -434,7 +520,7 @@ static void doCheckDownstreamStatus(SStreamTask* pTask, SStreamTask* pHTask) {
|
|||
}
|
||||
|
||||
// check if downstream tasks have been ready
|
||||
streamTaskCheckDownstreamTasks(pHTask);
|
||||
streamTaskDoCheckDownstreamTasks(pHTask);
|
||||
}
|
||||
|
||||
typedef struct SStreamTaskRetryInfo {
|
||||
|
@ -500,7 +586,7 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) {
|
|||
|
||||
// todo fix the bug: 2. race condition
|
||||
// an fill history task needs to be started.
|
||||
int32_t streamCheckHistoryTaskDownstream(SStreamTask* pTask) {
|
||||
int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) {
|
||||
SStreamMeta* pMeta = pTask->pMeta;
|
||||
int32_t hTaskId = pTask->historyTaskId.taskId;
|
||||
|
||||
|
@ -538,7 +624,6 @@ int32_t streamCheckHistoryTaskDownstream(SStreamTask* pTask) {
|
|||
}
|
||||
|
||||
int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask) {
|
||||
SStreamMeta* pMeta = pTask->pMeta;
|
||||
if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING) {
|
||||
return 0;
|
||||
}
|
||||
|
@ -555,16 +640,6 @@ int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask) {
|
|||
return -1;
|
||||
}
|
||||
|
||||
ASSERT(pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY);
|
||||
|
||||
// ready to process data from inputQ
|
||||
streamSetStatusNormal(pTask);
|
||||
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
|
||||
|
||||
taosWLockLatch(&pMeta->lock);
|
||||
streamMetaSaveTask(pMeta, pTask);
|
||||
taosWUnLockLatch(&pMeta->lock);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -661,54 +736,105 @@ int32_t tDecodeStreamTaskCheckRsp(SDecoder* pDecoder, SStreamTaskCheckRsp* pRsp)
|
|||
int32_t tEncodeStreamScanHistoryFinishReq(SEncoder* pEncoder, const SStreamScanHistoryFinishReq* pReq) {
|
||||
if (tStartEncode(pEncoder) < 0) return -1;
|
||||
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
|
||||
if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1;
|
||||
if (tEncodeI32(pEncoder, pReq->upstreamTaskId) < 0) return -1;
|
||||
if (tEncodeI32(pEncoder, pReq->upstreamNodeId) < 0) return -1;
|
||||
if (tEncodeI32(pEncoder, pReq->downstreamTaskId) < 0) return -1;
|
||||
if (tEncodeI32(pEncoder, pReq->childId) < 0) return -1;
|
||||
tEndEncode(pEncoder);
|
||||
return pEncoder->pos;
|
||||
}
|
||||
|
||||
int32_t tDecodeStreamScanHistoryFinishReq(SDecoder* pDecoder, SStreamScanHistoryFinishReq* pReq) {
|
||||
if (tStartDecode(pDecoder) < 0) return -1;
|
||||
if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1;
|
||||
if (tDecodeI32(pDecoder, &pReq->taskId) < 0) return -1;
|
||||
if (tDecodeI32(pDecoder, &pReq->upstreamTaskId) < 0) return -1;
|
||||
if (tDecodeI32(pDecoder, &pReq->upstreamNodeId) < 0) return -1;
|
||||
if (tDecodeI32(pDecoder, &pReq->downstreamTaskId) < 0) return -1;
|
||||
if (tDecodeI32(pDecoder, &pReq->childId) < 0) return -1;
|
||||
tEndDecode(pDecoder);
|
||||
return 0;
|
||||
}
|
||||
|
||||
// todo handle race condition, this task may be destroyed
|
||||
void streamPrepareNdoCheckDownstream(SStreamTask* pTask) {
|
||||
if (pTask->info.fillHistory) {
|
||||
qDebug("s-task:%s fill history task, wait for being launched", pTask->id.idStr);
|
||||
void streamTaskSetRangeStreamCalc(SStreamTask* pTask) {
|
||||
if (pTask->historyTaskId.taskId == 0) {
|
||||
SHistDataRange* pRange = &pTask->dataRange;
|
||||
qDebug("s-task:%s no related fill-history task, stream time window:%" PRId64 " - %" PRId64 ", ver range:%" PRId64
|
||||
" - %" PRId64,
|
||||
pTask->id.idStr, pRange->window.skey, pRange->window.ekey, pRange->range.minVer, pRange->range.maxVer);
|
||||
} else {
|
||||
// calculate the correct start time window, and start the handle the history data for the main task.
|
||||
if (pTask->historyTaskId.taskId != 0) {
|
||||
// check downstream tasks for associated scan-history-data tasks
|
||||
streamCheckHistoryTaskDownstream(pTask);
|
||||
SHistDataRange* pRange = &pTask->dataRange;
|
||||
|
||||
// launch current task
|
||||
SHistDataRange* pRange = &pTask->dataRange;
|
||||
int64_t ekey = pRange->window.ekey + 1;
|
||||
int64_t ver = pRange->range.minVer;
|
||||
int64_t ekey = pRange->window.ekey + 1;
|
||||
int64_t ver = pRange->range.minVer;
|
||||
|
||||
pRange->window.skey = ekey;
|
||||
pRange->window.ekey = INT64_MAX;
|
||||
pRange->range.minVer = 0;
|
||||
pRange->range.maxVer = ver;
|
||||
pRange->window.skey = ekey;
|
||||
pRange->window.ekey = INT64_MAX;
|
||||
pRange->range.minVer = 0;
|
||||
pRange->range.maxVer = ver;
|
||||
|
||||
qDebug("s-task:%s level:%d fill-history task exists, update stream time window:%" PRId64 " - %" PRId64
|
||||
", ver range:%" PRId64 " - %" PRId64,
|
||||
pTask->id.idStr, pTask->info.taskLevel, pRange->window.skey, pRange->window.ekey, pRange->range.minVer,
|
||||
pRange->range.maxVer);
|
||||
} else {
|
||||
SHistDataRange* pRange = &pTask->dataRange;
|
||||
qDebug("s-task:%s no associated scan-history task, stream time window:%" PRId64 " - %" PRId64
|
||||
", ver range:%" PRId64 " - %" PRId64,
|
||||
pTask->id.idStr, pRange->window.skey, pRange->window.ekey, pRange->range.minVer, pRange->range.maxVer);
|
||||
}
|
||||
|
||||
ASSERT(pTask->status.downstreamReady == 0);
|
||||
|
||||
// check downstream tasks for itself
|
||||
streamTaskCheckDownstreamTasks(pTask);
|
||||
qDebug("s-task:%s level:%d related-fill-history task exists, update stream calc time window:%" PRId64 " - %" PRId64
|
||||
", verRang:%" PRId64 " - %" PRId64,
|
||||
pTask->id.idStr, pTask->info.taskLevel, pRange->window.skey, pRange->window.ekey, pRange->range.minVer,
|
||||
pRange->range.maxVer);
|
||||
}
|
||||
}
|
||||
|
||||
void launchFillHistoryTask(SStreamTask* pTask) {
|
||||
int32_t tId = pTask->historyTaskId.taskId;
|
||||
if (tId == 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
ASSERT(pTask->status.downstreamReady == 1);
|
||||
qDebug("s-task:%s start to launch related fill-history task:0x%x", pTask->id.idStr, tId);
|
||||
|
||||
// launch associated fill history task
|
||||
streamLaunchFillHistoryTask(pTask);
|
||||
}
|
||||
|
||||
void streamTaskCheckDownstreamTasks(SStreamTask* pTask) {
|
||||
if (pTask->info.fillHistory) {
|
||||
qDebug("s-task:%s fill history task, wait for being launched", pTask->id.idStr);
|
||||
return;
|
||||
}
|
||||
|
||||
ASSERT(pTask->status.downstreamReady == 0);
|
||||
|
||||
// check downstream tasks for itself
|
||||
streamTaskDoCheckDownstreamTasks(pTask);
|
||||
}
|
||||
|
||||
void streamTaskPause(SStreamTask* pTask) {
|
||||
SStreamMeta* pMeta = pTask->pMeta;
|
||||
|
||||
int64_t st = taosGetTimestampMs();
|
||||
while(!pTask->status.pauseAllowed) {
|
||||
qDebug("s-task:%s wait for the task can be paused, vgId:%d", pTask->id.idStr, pMeta->vgId);
|
||||
taosMsleep(100);
|
||||
}
|
||||
|
||||
atomic_store_8(&pTask->status.keepTaskStatus, pTask->status.taskStatus);
|
||||
atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__PAUSE);
|
||||
|
||||
int64_t el = taosGetTimestampMs() - st;
|
||||
qDebug("vgId:%d s-task:%s set pause flag, prev:%s, elapsed time:%dms", pMeta->vgId, pTask->id.idStr,
|
||||
streamGetTaskStatusStr(pTask->status.keepTaskStatus), (int32_t)el);
|
||||
}
|
||||
|
||||
// todo fix race condition
|
||||
void streamTaskDisablePause(SStreamTask* pTask) {
|
||||
// pre-condition check
|
||||
const char* id = pTask->id.idStr;
|
||||
while (pTask->status.taskStatus == TASK_STATUS__PAUSE) {
|
||||
taosMsleep(100);
|
||||
qDebug("s-task:%s already in pause, wait for pause being cancelled, and set pause disabled, check in 100ms", id);
|
||||
}
|
||||
|
||||
qDebug("s-task:%s disable task pause", id);
|
||||
pTask->status.pauseAllowed = 0;
|
||||
}
|
||||
|
||||
void streamTaskEnablePause(SStreamTask* pTask) {
|
||||
qDebug("s-task:%s enable task pause", pTask->id.idStr);
|
||||
pTask->status.pauseAllowed = 1;
|
||||
}
|
|
@ -13,6 +13,8 @@
|
|||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include <libs/transport/trpc.h>
|
||||
#include <streamInt.h>
|
||||
#include "executor.h"
|
||||
#include "tstream.h"
|
||||
#include "wal.h"
|
||||
|
@ -44,7 +46,7 @@ SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, int8_t fillHisto
|
|||
pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE;
|
||||
pTask->status.taskStatus = TASK_STATUS__SCAN_HISTORY;
|
||||
pTask->inputStatus = TASK_INPUT_STATUS__NORMAL;
|
||||
pTask->outputStatus = TASK_OUTPUT_STATUS__NORMAL;
|
||||
pTask->outputInfo.status = TASK_OUTPUT_STATUS__NORMAL;
|
||||
|
||||
addToTaskset(pTaskList, pTask);
|
||||
return pTask;
|
||||
|
@ -74,7 +76,7 @@ int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
|
|||
if (tEncodeI32(pEncoder, pTask->id.taskId) < 0) return -1;
|
||||
if (tEncodeI32(pEncoder, pTask->info.totalLevel) < 0) return -1;
|
||||
if (tEncodeI8(pEncoder, pTask->info.taskLevel) < 0) return -1;
|
||||
if (tEncodeI8(pEncoder, pTask->outputType) < 0) return -1;
|
||||
if (tEncodeI8(pEncoder, pTask->outputInfo.type) < 0) return -1;
|
||||
if (tEncodeI16(pEncoder, pTask->msgInfo.msgType) < 0) return -1;
|
||||
|
||||
if (tEncodeI8(pEncoder, pTask->status.taskStatus) < 0) return -1;
|
||||
|
@ -109,19 +111,19 @@ int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
|
|||
if (tEncodeCStr(pEncoder, pTask->exec.qmsg) < 0) return -1;
|
||||
}
|
||||
|
||||
if (pTask->outputType == TASK_OUTPUT__TABLE) {
|
||||
if (pTask->outputInfo.type == TASK_OUTPUT__TABLE) {
|
||||
if (tEncodeI64(pEncoder, pTask->tbSink.stbUid) < 0) return -1;
|
||||
if (tEncodeCStr(pEncoder, pTask->tbSink.stbFullName) < 0) return -1;
|
||||
if (tEncodeSSchemaWrapper(pEncoder, pTask->tbSink.pSchemaWrapper) < 0) return -1;
|
||||
} else if (pTask->outputType == TASK_OUTPUT__SMA) {
|
||||
} else if (pTask->outputInfo.type == TASK_OUTPUT__SMA) {
|
||||
if (tEncodeI64(pEncoder, pTask->smaSink.smaId) < 0) return -1;
|
||||
} else if (pTask->outputType == TASK_OUTPUT__FETCH) {
|
||||
} else if (pTask->outputInfo.type == TASK_OUTPUT__FETCH) {
|
||||
if (tEncodeI8(pEncoder, pTask->fetchSink.reserved) < 0) return -1;
|
||||
} else if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) {
|
||||
} else if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
|
||||
if (tEncodeI32(pEncoder, pTask->fixedEpDispatcher.taskId) < 0) return -1;
|
||||
if (tEncodeI32(pEncoder, pTask->fixedEpDispatcher.nodeId) < 0) return -1;
|
||||
if (tEncodeSEpSet(pEncoder, &pTask->fixedEpDispatcher.epSet) < 0) return -1;
|
||||
} else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
||||
} else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
||||
if (tSerializeSUseDbRspImp(pEncoder, &pTask->shuffleDispatcher.dbInfo) < 0) return -1;
|
||||
if (tEncodeCStr(pEncoder, pTask->shuffleDispatcher.stbFullName) < 0) return -1;
|
||||
}
|
||||
|
@ -137,7 +139,7 @@ int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
|
|||
if (tDecodeI32(pDecoder, &pTask->id.taskId) < 0) return -1;
|
||||
if (tDecodeI32(pDecoder, &pTask->info.totalLevel) < 0) return -1;
|
||||
if (tDecodeI8(pDecoder, &pTask->info.taskLevel) < 0) return -1;
|
||||
if (tDecodeI8(pDecoder, &pTask->outputType) < 0) return -1;
|
||||
if (tDecodeI8(pDecoder, &pTask->outputInfo.type) < 0) return -1;
|
||||
if (tDecodeI16(pDecoder, &pTask->msgInfo.msgType) < 0) return -1;
|
||||
|
||||
if (tDecodeI8(pDecoder, &pTask->status.taskStatus) < 0) return -1;
|
||||
|
@ -179,21 +181,21 @@ int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
|
|||
if (tDecodeCStrAlloc(pDecoder, &pTask->exec.qmsg) < 0) return -1;
|
||||
}
|
||||
|
||||
if (pTask->outputType == TASK_OUTPUT__TABLE) {
|
||||
if (pTask->outputInfo.type == TASK_OUTPUT__TABLE) {
|
||||
if (tDecodeI64(pDecoder, &pTask->tbSink.stbUid) < 0) return -1;
|
||||
if (tDecodeCStrTo(pDecoder, pTask->tbSink.stbFullName) < 0) return -1;
|
||||
pTask->tbSink.pSchemaWrapper = taosMemoryCalloc(1, sizeof(SSchemaWrapper));
|
||||
if (pTask->tbSink.pSchemaWrapper == NULL) return -1;
|
||||
if (tDecodeSSchemaWrapper(pDecoder, pTask->tbSink.pSchemaWrapper) < 0) return -1;
|
||||
} else if (pTask->outputType == TASK_OUTPUT__SMA) {
|
||||
} else if (pTask->outputInfo.type == TASK_OUTPUT__SMA) {
|
||||
if (tDecodeI64(pDecoder, &pTask->smaSink.smaId) < 0) return -1;
|
||||
} else if (pTask->outputType == TASK_OUTPUT__FETCH) {
|
||||
} else if (pTask->outputInfo.type == TASK_OUTPUT__FETCH) {
|
||||
if (tDecodeI8(pDecoder, &pTask->fetchSink.reserved) < 0) return -1;
|
||||
} else if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) {
|
||||
} else if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
|
||||
if (tDecodeI32(pDecoder, &pTask->fixedEpDispatcher.taskId) < 0) return -1;
|
||||
if (tDecodeI32(pDecoder, &pTask->fixedEpDispatcher.nodeId) < 0) return -1;
|
||||
if (tDecodeSEpSet(pDecoder, &pTask->fixedEpDispatcher.epSet) < 0) return -1;
|
||||
} else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
||||
} else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
||||
if (tDeserializeSUseDbRspImp(pDecoder, &pTask->shuffleDispatcher.dbInfo) < 0) return -1;
|
||||
if (tDecodeCStrTo(pDecoder, pTask->shuffleDispatcher.stbFullName) < 0) return -1;
|
||||
}
|
||||
|
@ -203,6 +205,11 @@ int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
static void freeItem(void* p) {
|
||||
SStreamContinueExecInfo* pInfo = p;
|
||||
rpcFreeCont(pInfo->msg.pCont);
|
||||
}
|
||||
|
||||
void tFreeStreamTask(SStreamTask* pTask) {
|
||||
qDebug("free s-task:%s", pTask->id.idStr);
|
||||
|
||||
|
@ -211,8 +218,8 @@ void tFreeStreamTask(SStreamTask* pTask) {
|
|||
streamQueueClose(pTask->inputQueue);
|
||||
}
|
||||
|
||||
if (pTask->outputQueue) {
|
||||
streamQueueClose(pTask->outputQueue);
|
||||
if (pTask->outputInfo.queue) {
|
||||
streamQueueClose(pTask->outputInfo.queue);
|
||||
}
|
||||
|
||||
if (pTask->exec.qmsg) {
|
||||
|
@ -229,11 +236,11 @@ void tFreeStreamTask(SStreamTask* pTask) {
|
|||
}
|
||||
|
||||
taosArrayDestroyP(pTask->pUpstreamEpInfoList, taosMemoryFree);
|
||||
if (pTask->outputType == TASK_OUTPUT__TABLE) {
|
||||
if (pTask->outputInfo.type == TASK_OUTPUT__TABLE) {
|
||||
tDeleteSchemaWrapper(pTask->tbSink.pSchemaWrapper);
|
||||
taosMemoryFree(pTask->tbSink.pTSchema);
|
||||
tSimpleHashCleanup(pTask->tbSink.pTblInfo);
|
||||
} else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
||||
} else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
||||
taosArrayDestroy(pTask->shuffleDispatcher.dbInfo.pVgroupInfos);
|
||||
taosArrayDestroy(pTask->checkReqIds);
|
||||
pTask->checkReqIds = NULL;
|
||||
|
@ -251,5 +258,11 @@ void tFreeStreamTask(SStreamTask* pTask) {
|
|||
tSimpleHashCleanup(pTask->pNameMap);
|
||||
}
|
||||
|
||||
if (pTask->pRspMsgList != NULL) {
|
||||
taosArrayDestroyEx(pTask->pRspMsgList, freeItem);
|
||||
pTask->pRspMsgList = NULL;
|
||||
}
|
||||
|
||||
taosThreadMutexDestroy(&pTask->lock);
|
||||
taosMemoryFree(pTask);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue