Merge pull request #23768 from taosdata/fix/fixChkpRespMemleak
fix mem leak
This commit is contained in:
commit
421a154d2a
|
@ -34,17 +34,16 @@ extern "C" {
|
||||||
#define SIZE_IN_MiB(_v) ((_v) / ONE_MiB_F)
|
#define SIZE_IN_MiB(_v) ((_v) / ONE_MiB_F)
|
||||||
#define SIZE_IN_KiB(_v) ((_v) / ONE_KiB_F)
|
#define SIZE_IN_KiB(_v) ((_v) / ONE_KiB_F)
|
||||||
|
|
||||||
|
#define TASK_DOWNSTREAM_READY 0x0
|
||||||
|
#define TASK_DOWNSTREAM_NOT_READY 0x1
|
||||||
|
#define TASK_DOWNSTREAM_NOT_LEADER 0x2
|
||||||
|
#define TASK_UPSTREAM_NEW_STAGE 0x3
|
||||||
|
|
||||||
#define TASK_DOWNSTREAM_READY 0x0
|
#define NODE_ROLE_UNINIT 0x1
|
||||||
#define TASK_DOWNSTREAM_NOT_READY 0x1
|
#define NODE_ROLE_LEADER 0x2
|
||||||
#define TASK_DOWNSTREAM_NOT_LEADER 0x2
|
#define NODE_ROLE_FOLLOWER 0x3
|
||||||
#define TASK_UPSTREAM_NEW_STAGE 0x3
|
|
||||||
|
|
||||||
#define NODE_ROLE_UNINIT 0x1
|
#define HAS_RELATED_FILLHISTORY_TASK(_t) ((_t)->hTaskInfo.id.taskId != 0)
|
||||||
#define NODE_ROLE_LEADER 0x2
|
|
||||||
#define NODE_ROLE_FOLLOWER 0x3
|
|
||||||
|
|
||||||
#define HAS_RELATED_FILLHISTORY_TASK(_t) ((_t)->hTaskInfo.id.taskId != 0)
|
|
||||||
#define CLEAR_RELATED_FILLHISTORY_TASK(_t) \
|
#define CLEAR_RELATED_FILLHISTORY_TASK(_t) \
|
||||||
do { \
|
do { \
|
||||||
(_t)->hTaskInfo.id.taskId = 0; \
|
(_t)->hTaskInfo.id.taskId = 0; \
|
||||||
|
@ -159,7 +158,7 @@ typedef struct {
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int8_t type;
|
int8_t type;
|
||||||
int64_t ver;
|
int64_t ver;
|
||||||
SArray* submits; // SArray<SPackedSubmit>
|
SArray* submits; // SArray<SPackedSubmit>
|
||||||
} SStreamMergedSubmit;
|
} SStreamMergedSubmit;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
@ -251,7 +250,7 @@ typedef struct {
|
||||||
} SScanhistoryDataInfo;
|
} SScanhistoryDataInfo;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int32_t idleDuration; // idle time before use time slice the continue execute scan-history
|
int32_t idleDuration; // idle time before use time slice the continue execute scan-history
|
||||||
int32_t numOfTicks;
|
int32_t numOfTicks;
|
||||||
tmr_h pTimer;
|
tmr_h pTimer;
|
||||||
int32_t execCount;
|
int32_t execCount;
|
||||||
|
@ -301,10 +300,10 @@ typedef struct SStreamTaskId {
|
||||||
typedef struct SCheckpointInfo {
|
typedef struct SCheckpointInfo {
|
||||||
int64_t startTs;
|
int64_t startTs;
|
||||||
int64_t checkpointId;
|
int64_t checkpointId;
|
||||||
int64_t checkpointVer; // latest checkpointId version
|
int64_t checkpointVer; // latest checkpointId version
|
||||||
int64_t processedVer; // already processed ver, that has generated results version.
|
int64_t processedVer; // already processed ver, that has generated results version.
|
||||||
int64_t nextProcessVer; // current offset in WAL, not serialize it
|
int64_t nextProcessVer; // current offset in WAL, not serialize it
|
||||||
int64_t failedId; // record the latest failed checkpoint id
|
int64_t failedId; // record the latest failed checkpoint id
|
||||||
bool dispatchCheckpointTrigger;
|
bool dispatchCheckpointTrigger;
|
||||||
} SCheckpointInfo;
|
} SCheckpointInfo;
|
||||||
|
|
||||||
|
@ -332,22 +331,22 @@ typedef struct SSTaskBasicInfo {
|
||||||
int32_t selfChildId;
|
int32_t selfChildId;
|
||||||
int32_t totalLevel;
|
int32_t totalLevel;
|
||||||
int8_t taskLevel;
|
int8_t taskLevel;
|
||||||
int8_t fillHistory; // is fill history task or not
|
int8_t fillHistory; // is fill history task or not
|
||||||
int64_t triggerParam; // in msec
|
int64_t triggerParam; // in msec
|
||||||
} SSTaskBasicInfo;
|
} SSTaskBasicInfo;
|
||||||
|
|
||||||
typedef struct SStreamDispatchReq SStreamDispatchReq;
|
typedef struct SStreamDispatchReq SStreamDispatchReq;
|
||||||
typedef struct STokenBucket STokenBucket;
|
typedef struct STokenBucket STokenBucket;
|
||||||
typedef struct SMetaHbInfo SMetaHbInfo;
|
typedef struct SMetaHbInfo SMetaHbInfo;
|
||||||
|
|
||||||
typedef struct SDispatchMsgInfo {
|
typedef struct SDispatchMsgInfo {
|
||||||
SStreamDispatchReq* pData; // current dispatch data
|
SStreamDispatchReq* pData; // current dispatch data
|
||||||
int8_t dispatchMsgType;
|
int8_t dispatchMsgType;
|
||||||
int16_t msgType; // dispatch msg type
|
int16_t msgType; // dispatch msg type
|
||||||
int32_t retryCount; // retry send data count
|
int32_t retryCount; // retry send data count
|
||||||
int64_t startTs; // dispatch start time, record total elapsed time for dispatch
|
int64_t startTs; // dispatch start time, record total elapsed time for dispatch
|
||||||
SArray* pRetryList; // current dispatch successfully completed node of downstream
|
SArray* pRetryList; // current dispatch successfully completed node of downstream
|
||||||
void* pTimer; // used to dispatch data after a given time duration
|
void* pTimer; // used to dispatch data after a given time duration
|
||||||
} SDispatchMsgInfo;
|
} SDispatchMsgInfo;
|
||||||
|
|
||||||
typedef struct STaskQueue {
|
typedef struct STaskQueue {
|
||||||
|
@ -356,8 +355,8 @@ typedef struct STaskQueue {
|
||||||
} STaskQueue;
|
} STaskQueue;
|
||||||
|
|
||||||
typedef struct STaskSchedInfo {
|
typedef struct STaskSchedInfo {
|
||||||
int8_t status;
|
int8_t status;
|
||||||
void* pTimer;
|
void* pTimer;
|
||||||
} STaskSchedInfo;
|
} STaskSchedInfo;
|
||||||
|
|
||||||
typedef struct SSinkRecorder {
|
typedef struct SSinkRecorder {
|
||||||
|
@ -391,7 +390,7 @@ typedef struct SHistoryTaskInfo {
|
||||||
int32_t tickCount;
|
int32_t tickCount;
|
||||||
int32_t retryTimes;
|
int32_t retryTimes;
|
||||||
int32_t waitInterval;
|
int32_t waitInterval;
|
||||||
int64_t haltVer; // offset in wal when halt the stream task
|
int64_t haltVer; // offset in wal when halt the stream task
|
||||||
} SHistoryTaskInfo;
|
} SHistoryTaskInfo;
|
||||||
|
|
||||||
typedef struct STaskOutputInfo {
|
typedef struct STaskOutputInfo {
|
||||||
|
@ -457,9 +456,9 @@ typedef struct STaskStartInfo {
|
||||||
int64_t startTs;
|
int64_t startTs;
|
||||||
int64_t readyTs;
|
int64_t readyTs;
|
||||||
int32_t tasksWillRestart;
|
int32_t tasksWillRestart;
|
||||||
int32_t taskStarting; // restart flag, sentinel to guard the restart procedure.
|
int32_t taskStarting; // restart flag, sentinel to guard the restart procedure.
|
||||||
SHashObj* pReadyTaskSet; // tasks that are all ready for running stream processing
|
SHashObj* pReadyTaskSet; // tasks that are all ready for running stream processing
|
||||||
SHashObj* pFailedTaskSet; // tasks that are done the check downstream process, may be successful or failed
|
SHashObj* pFailedTaskSet; // tasks that are done the check downstream process, may be successful or failed
|
||||||
int64_t elapsedTime;
|
int64_t elapsedTime;
|
||||||
} STaskStartInfo;
|
} STaskStartInfo;
|
||||||
|
|
||||||
|
@ -532,7 +531,7 @@ struct SStreamDispatchReq {
|
||||||
int64_t stage; // nodeId from upstream task
|
int64_t stage; // nodeId from upstream task
|
||||||
int64_t streamId;
|
int64_t streamId;
|
||||||
int32_t taskId;
|
int32_t taskId;
|
||||||
int32_t msgId; // msg id to identify if the incoming msg from the same sender
|
int32_t msgId; // msg id to identify if the incoming msg from the same sender
|
||||||
int32_t srcVgId;
|
int32_t srcVgId;
|
||||||
int32_t upstreamTaskId;
|
int32_t upstreamTaskId;
|
||||||
int32_t upstreamChildId;
|
int32_t upstreamChildId;
|
||||||
|
@ -658,22 +657,22 @@ typedef struct STaskStatusEntry {
|
||||||
int32_t status;
|
int32_t status;
|
||||||
int64_t stage;
|
int64_t stage;
|
||||||
int32_t nodeId;
|
int32_t nodeId;
|
||||||
int64_t verStart; // start version in WAL, only valid for source task
|
int64_t verStart; // start version in WAL, only valid for source task
|
||||||
int64_t verEnd; // end version in WAL, only valid for source task
|
int64_t verEnd; // end version in WAL, only valid for source task
|
||||||
int64_t processedVer; // only valid for source task
|
int64_t processedVer; // only valid for source task
|
||||||
int64_t activeCheckpointId; // current active checkpoint id
|
int64_t activeCheckpointId; // current active checkpoint id
|
||||||
bool checkpointFailed; // denote if the checkpoint is failed or not
|
bool checkpointFailed; // denote if the checkpoint is failed or not
|
||||||
double inputQUsed; // in MiB
|
double inputQUsed; // in MiB
|
||||||
double inputRate;
|
double inputRate;
|
||||||
double sinkQuota; // existed quota size for sink task
|
double sinkQuota; // existed quota size for sink task
|
||||||
double sinkDataSize; // sink to dest data size
|
double sinkDataSize; // sink to dest data size
|
||||||
} STaskStatusEntry;
|
} STaskStatusEntry;
|
||||||
|
|
||||||
typedef struct SStreamHbMsg {
|
typedef struct SStreamHbMsg {
|
||||||
int32_t vgId;
|
int32_t vgId;
|
||||||
int32_t numOfTasks;
|
int32_t numOfTasks;
|
||||||
SArray* pTaskStatus; // SArray<STaskStatusEntry>
|
SArray* pTaskStatus; // SArray<STaskStatusEntry>
|
||||||
SArray* pUpdateNodes; // SArray<int32_t>, needs update the epsets in stream tasks for those nodes.
|
SArray* pUpdateNodes; // SArray<int32_t>, needs update the epsets in stream tasks for those nodes.
|
||||||
} SStreamHbMsg;
|
} SStreamHbMsg;
|
||||||
|
|
||||||
int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pRsp);
|
int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pRsp);
|
||||||
|
@ -697,7 +696,7 @@ typedef struct SNodeUpdateInfo {
|
||||||
} SNodeUpdateInfo;
|
} SNodeUpdateInfo;
|
||||||
|
|
||||||
typedef struct SStreamTaskNodeUpdateMsg {
|
typedef struct SStreamTaskNodeUpdateMsg {
|
||||||
int32_t transId; // to identify the msg
|
int32_t transId; // to identify the msg
|
||||||
int64_t streamId;
|
int64_t streamId;
|
||||||
int32_t taskId;
|
int32_t taskId;
|
||||||
SArray* pNodeList; // SArray<SNodeUpdateInfo>
|
SArray* pNodeList; // SArray<SNodeUpdateInfo>
|
||||||
|
@ -754,12 +753,13 @@ const char* streamTaskGetStatusStr(ETaskStatus status);
|
||||||
void streamTaskResetStatus(SStreamTask* pTask);
|
void streamTaskResetStatus(SStreamTask* pTask);
|
||||||
void streamTaskSetStatusReady(SStreamTask* pTask);
|
void streamTaskSetStatusReady(SStreamTask* pTask);
|
||||||
|
|
||||||
void initRpcMsg(SRpcMsg* pMsg, int32_t msgType, void* pCont, int32_t contLen);
|
void initRpcMsg(SRpcMsg* pMsg, int32_t msgType, void* pCont, int32_t contLen);
|
||||||
|
|
||||||
// recover and fill history
|
// recover and fill history
|
||||||
void streamTaskCheckDownstream(SStreamTask* pTask);
|
void streamTaskCheckDownstream(SStreamTask* pTask);
|
||||||
|
|
||||||
int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_t vgId, int64_t stage, int64_t* oldStage);
|
int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_t vgId, int64_t stage,
|
||||||
|
int64_t* oldStage);
|
||||||
int32_t streamTaskUpdateEpsetInfo(SStreamTask* pTask, SArray* pNodeList);
|
int32_t streamTaskUpdateEpsetInfo(SStreamTask* pTask, SArray* pNodeList);
|
||||||
void streamTaskResetUpstreamStageInfo(SStreamTask* pTask);
|
void streamTaskResetUpstreamStageInfo(SStreamTask* pTask);
|
||||||
bool streamTaskAllUpstreamClosed(SStreamTask* pTask);
|
bool streamTaskAllUpstreamClosed(SStreamTask* pTask);
|
||||||
|
@ -785,17 +785,17 @@ bool streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t latestVer)
|
||||||
int32_t streamQueueGetNumOfItems(const SStreamQueue* pQueue);
|
int32_t streamQueueGetNumOfItems(const SStreamQueue* pQueue);
|
||||||
|
|
||||||
// common
|
// common
|
||||||
int32_t streamRestoreParam(SStreamTask* pTask);
|
int32_t streamRestoreParam(SStreamTask* pTask);
|
||||||
void streamTaskPause(SStreamTask* pTask, SStreamMeta* pMeta);
|
void streamTaskPause(SStreamTask* pTask, SStreamMeta* pMeta);
|
||||||
void streamTaskResume(SStreamTask* pTask);
|
void streamTaskResume(SStreamTask* pTask);
|
||||||
int32_t streamTaskSetUpstreamInfo(SStreamTask* pTask, const SStreamTask* pUpstreamTask);
|
int32_t streamTaskSetUpstreamInfo(SStreamTask* pTask, const SStreamTask* pUpstreamTask);
|
||||||
void streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet);
|
void streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet);
|
||||||
void streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet);
|
void streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet);
|
||||||
void streamTaskSetFixedDownstreamInfo(SStreamTask* pTask, const SStreamTask* pDownstreamTask);
|
void streamTaskSetFixedDownstreamInfo(SStreamTask* pTask, const SStreamTask* pDownstreamTask);
|
||||||
int32_t streamTaskReleaseState(SStreamTask* pTask);
|
int32_t streamTaskReleaseState(SStreamTask* pTask);
|
||||||
int32_t streamTaskReloadState(SStreamTask* pTask);
|
int32_t streamTaskReloadState(SStreamTask* pTask);
|
||||||
void streamTaskCloseUpstreamInput(SStreamTask* pTask, int32_t taskId);
|
void streamTaskCloseUpstreamInput(SStreamTask* pTask, int32_t taskId);
|
||||||
void streamTaskOpenAllUpstreamInput(SStreamTask* pTask);
|
void streamTaskOpenAllUpstreamInput(SStreamTask* pTask);
|
||||||
|
|
||||||
void streamTaskStatusInit(STaskStatusEntry* pEntry, const SStreamTask* pTask);
|
void streamTaskStatusInit(STaskStatusEntry* pEntry, const SStreamTask* pTask);
|
||||||
void streamTaskStatusCopy(STaskStatusEntry* pDst, const STaskStatusEntry* pSrc);
|
void streamTaskStatusCopy(STaskStatusEntry* pDst, const STaskStatusEntry* pSrc);
|
||||||
|
@ -804,7 +804,7 @@ void streamTaskStatusCopy(STaskStatusEntry* pDst, const STaskStatusEntry* pSrc);
|
||||||
int32_t streamSetParamForStreamScannerStep1(SStreamTask* pTask, SVersionRange* pVerRange, STimeWindow* pWindow);
|
int32_t streamSetParamForStreamScannerStep1(SStreamTask* pTask, SVersionRange* pVerRange, STimeWindow* pWindow);
|
||||||
int32_t streamSetParamForStreamScannerStep2(SStreamTask* pTask, SVersionRange* pVerRange, STimeWindow* pWindow);
|
int32_t streamSetParamForStreamScannerStep2(SStreamTask* pTask, SVersionRange* pVerRange, STimeWindow* pWindow);
|
||||||
SScanhistoryDataInfo streamScanHistoryData(SStreamTask* pTask, int64_t st);
|
SScanhistoryDataInfo streamScanHistoryData(SStreamTask* pTask, int64_t st);
|
||||||
int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask);
|
int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask);
|
||||||
|
|
||||||
// agg level
|
// agg level
|
||||||
int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, SStreamScanHistoryFinishReq* pReq, SRpcHandleInfo* pInfo);
|
int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, SStreamScanHistoryFinishReq* pReq, SRpcHandleInfo* pInfo);
|
||||||
|
@ -841,7 +841,7 @@ void streamMetaResetStartInfo(STaskStartInfo* pMeta);
|
||||||
int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSourceReq* pReq);
|
int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSourceReq* pReq);
|
||||||
int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask);
|
int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask);
|
||||||
int32_t streamTaskBuildCheckpoint(SStreamTask* pTask);
|
int32_t streamTaskBuildCheckpoint(SStreamTask* pTask);
|
||||||
void streamTaskClearCheckInfo(SStreamTask* pTask);
|
void streamTaskClearCheckInfo(SStreamTask* pTask, bool clearChkpReadyMsg);
|
||||||
int32_t streamAlignTransferState(SStreamTask* pTask);
|
int32_t streamAlignTransferState(SStreamTask* pTask);
|
||||||
int32_t streamBuildAndSendDropTaskMsg(SMsgCb* pMsgCb, int32_t vgId, SStreamTaskId* pTaskId);
|
int32_t streamBuildAndSendDropTaskMsg(SMsgCb* pMsgCb, int32_t vgId, SStreamTaskId* pTaskId);
|
||||||
int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SStreamTask* pTask,
|
int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SStreamTask* pTask,
|
||||||
|
|
|
@ -14,8 +14,8 @@
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "tq.h"
|
#include "tq.h"
|
||||||
#include "vnd.h"
|
|
||||||
#include "stream.h"
|
#include "stream.h"
|
||||||
|
#include "vnd.h"
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int8_t inited;
|
int8_t inited;
|
||||||
|
@ -83,7 +83,7 @@ void tqDestroyTqHandle(void* data) {
|
||||||
taosMemoryFree(pData->msg);
|
taosMemoryFree(pData->msg);
|
||||||
pData->msg = NULL;
|
pData->msg = NULL;
|
||||||
}
|
}
|
||||||
if (pData->block != NULL){
|
if (pData->block != NULL) {
|
||||||
blockDataDestroy(pData->block);
|
blockDataDestroy(pData->block);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -586,9 +586,9 @@ int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
|
||||||
taosWLockLatch(&pTq->lock);
|
taosWLockLatch(&pTq->lock);
|
||||||
bool exec = tqIsHandleExec(pHandle);
|
bool exec = tqIsHandleExec(pHandle);
|
||||||
|
|
||||||
if(exec){
|
if (exec) {
|
||||||
tqInfo("vgId:%d, topic:%s, subscription is executing, delete wait for 10ms and retry, pHandle:%p", vgId,
|
tqInfo("vgId:%d, topic:%s, subscription is executing, delete wait for 10ms and retry, pHandle:%p", vgId,
|
||||||
pHandle->subKey, pHandle);
|
pHandle->subKey, pHandle);
|
||||||
taosWUnLockLatch(&pTq->lock);
|
taosWUnLockLatch(&pTq->lock);
|
||||||
taosMsleep(10);
|
taosMsleep(10);
|
||||||
continue;
|
continue;
|
||||||
|
@ -705,12 +705,12 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
|
||||||
ret = tqMetaSaveHandle(pTq, req.subKey, &handle);
|
ret = tqMetaSaveHandle(pTq, req.subKey, &handle);
|
||||||
taosWUnLockLatch(&pTq->lock);
|
taosWUnLockLatch(&pTq->lock);
|
||||||
} else {
|
} else {
|
||||||
while(1){
|
while (1) {
|
||||||
taosWLockLatch(&pTq->lock);
|
taosWLockLatch(&pTq->lock);
|
||||||
bool exec = tqIsHandleExec(pHandle);
|
bool exec = tqIsHandleExec(pHandle);
|
||||||
if(exec){
|
if (exec) {
|
||||||
tqInfo("vgId:%d, topic:%s, subscription is executing, sub wait for 10ms and retry, pHandle:%p", pTq->pVnode->config.vgId,
|
tqInfo("vgId:%d, topic:%s, subscription is executing, sub wait for 10ms and retry, pHandle:%p",
|
||||||
pHandle->subKey, pHandle);
|
pTq->pVnode->config.vgId, pHandle->subKey, pHandle);
|
||||||
taosWUnLockLatch(&pTq->lock);
|
taosWUnLockLatch(&pTq->lock);
|
||||||
taosMsleep(10);
|
taosMsleep(10);
|
||||||
continue;
|
continue;
|
||||||
|
@ -719,7 +719,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
|
||||||
tqInfo("vgId:%d no switch consumer:0x%" PRIx64 " remains, because redo wal log", req.vgId, req.newConsumerId);
|
tqInfo("vgId:%d no switch consumer:0x%" PRIx64 " remains, because redo wal log", req.vgId, req.newConsumerId);
|
||||||
} else {
|
} else {
|
||||||
tqInfo("vgId:%d switch consumer from Id:0x%" PRIx64 " to Id:0x%" PRIx64, req.vgId, pHandle->consumerId,
|
tqInfo("vgId:%d switch consumer from Id:0x%" PRIx64 " to Id:0x%" PRIx64, req.vgId, pHandle->consumerId,
|
||||||
req.newConsumerId);
|
req.newConsumerId);
|
||||||
atomic_store_64(&pHandle->consumerId, req.newConsumerId);
|
atomic_store_64(&pHandle->consumerId, req.newConsumerId);
|
||||||
atomic_store_32(&pHandle->epoch, 0);
|
atomic_store_32(&pHandle->epoch, 0);
|
||||||
tqUnregisterPushHandle(pTq, pHandle);
|
tqUnregisterPushHandle(pTq, pHandle);
|
||||||
|
@ -851,11 +851,11 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t nextProcessVer) {
|
||||||
pTask->exec.pWalReader = walOpenReader(pTq->pVnode->pWal, &cond, pTask->id.taskId);
|
pTask->exec.pWalReader = walOpenReader(pTq->pVnode->pWal, &cond, pTask->id.taskId);
|
||||||
}
|
}
|
||||||
|
|
||||||
// // reset the task status from unfinished transaction
|
// // reset the task status from unfinished transaction
|
||||||
// if (pTask->status.taskStatus == TASK_STATUS__PAUSE) {
|
// if (pTask->status.taskStatus == TASK_STATUS__PAUSE) {
|
||||||
// tqWarn("s-task:%s reset task status to be normal, status kept in taskMeta: Paused", pTask->id.idStr);
|
// tqWarn("s-task:%s reset task status to be normal, status kept in taskMeta: Paused", pTask->id.idStr);
|
||||||
// pTask->status.taskStatus = TASK_STATUS__READY;
|
// pTask->status.taskStatus = TASK_STATUS__READY;
|
||||||
// }
|
// }
|
||||||
|
|
||||||
streamTaskResetUpstreamStageInfo(pTask);
|
streamTaskResetUpstreamStageInfo(pTask);
|
||||||
streamSetupScheduleTrigger(pTask);
|
streamSetupScheduleTrigger(pTask);
|
||||||
|
@ -891,9 +891,9 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t nextProcessVer) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tqProcessTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) {
|
int32_t tqProcessTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
char* msgStr = pMsg->pCont;
|
char* msgStr = pMsg->pCont;
|
||||||
char* msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
|
char* msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
|
||||||
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
|
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
|
||||||
SStreamMeta* pMeta = pTq->pStreamMeta;
|
SStreamMeta* pMeta = pTq->pStreamMeta;
|
||||||
|
|
||||||
SStreamTaskCheckReq req;
|
SStreamTaskCheckReq req;
|
||||||
|
@ -917,8 +917,9 @@ int32_t tqProcessTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
|
|
||||||
// only the leader node handle the check request
|
// only the leader node handle the check request
|
||||||
if (pMeta->role == NODE_ROLE_FOLLOWER) {
|
if (pMeta->role == NODE_ROLE_FOLLOWER) {
|
||||||
tqError("s-task:0x%x invalid check msg from upstream:0x%x(vgId:%d), vgId:%d is follower, not handle check status msg",
|
tqError(
|
||||||
taskId, req.upstreamTaskId, req.upstreamNodeId, pMeta->vgId);
|
"s-task:0x%x invalid check msg from upstream:0x%x(vgId:%d), vgId:%d is follower, not handle check status msg",
|
||||||
|
taskId, req.upstreamTaskId, req.upstreamNodeId, pMeta->vgId);
|
||||||
rsp.status = TASK_DOWNSTREAM_NOT_LEADER;
|
rsp.status = TASK_DOWNSTREAM_NOT_LEADER;
|
||||||
} else {
|
} else {
|
||||||
SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.streamId, taskId);
|
SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.streamId, taskId);
|
||||||
|
@ -928,7 +929,8 @@ int32_t tqProcessTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
|
|
||||||
char* p = NULL;
|
char* p = NULL;
|
||||||
streamTaskGetStatus(pTask, &p);
|
streamTaskGetStatus(pTask, &p);
|
||||||
tqDebug("s-task:%s status:%s, stage:%"PRId64" recv task check req(reqId:0x%" PRIx64 ") task:0x%x (vgId:%d), check_status:%d",
|
tqDebug("s-task:%s status:%s, stage:%" PRId64 " recv task check req(reqId:0x%" PRIx64
|
||||||
|
") task:0x%x (vgId:%d), check_status:%d",
|
||||||
pTask->id.idStr, p, rsp.oldStage, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status);
|
pTask->id.idStr, p, rsp.oldStage, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status);
|
||||||
} else {
|
} else {
|
||||||
rsp.status = TASK_DOWNSTREAM_NOT_READY;
|
rsp.status = TASK_DOWNSTREAM_NOT_READY;
|
||||||
|
@ -994,7 +996,7 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms
|
||||||
tqDebug("vgId:%d receive new stream task deploy msg, start to build stream task", vgId);
|
tqDebug("vgId:%d receive new stream task deploy msg, start to build stream task", vgId);
|
||||||
|
|
||||||
// 1.deserialize msg and build task
|
// 1.deserialize msg and build task
|
||||||
int32_t size = sizeof(SStreamTask);
|
int32_t size = sizeof(SStreamTask);
|
||||||
SStreamTask* pTask = taosMemoryCalloc(1, size);
|
SStreamTask* pTask = taosMemoryCalloc(1, size);
|
||||||
if (pTask == NULL) {
|
if (pTask == NULL) {
|
||||||
tqError("vgId:%d failed to create stream task due to out of memory, alloc size:%d", vgId, size);
|
tqError("vgId:%d failed to create stream task due to out of memory, alloc size:%d", vgId, size);
|
||||||
|
@ -1024,7 +1026,8 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms
|
||||||
streamMetaWUnLock(pStreamMeta);
|
streamMetaWUnLock(pStreamMeta);
|
||||||
|
|
||||||
if (code < 0) {
|
if (code < 0) {
|
||||||
tqError("failed to add s-task:0x%x into vgId:%d meta, total:%d, code:%s", vgId, taskId, numOfTasks, tstrerror(code));
|
tqError("failed to add s-task:0x%x into vgId:%d meta, total:%d, code:%s", vgId, taskId, numOfTasks,
|
||||||
|
tstrerror(code));
|
||||||
tFreeStreamTask(pTask);
|
tFreeStreamTask(pTask);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -1064,7 +1067,7 @@ static void doStartFillhistoryStep2(SStreamTask* pTask, SStreamTask* pStreamTask
|
||||||
int64_t nextProcessedVer = pStreamTask->hTaskInfo.haltVer;
|
int64_t nextProcessedVer = pStreamTask->hTaskInfo.haltVer;
|
||||||
|
|
||||||
// if it's an source task, extract the last version in wal.
|
// if it's an source task, extract the last version in wal.
|
||||||
SVersionRange *pRange = &pTask->dataRange.range;
|
SVersionRange* pRange = &pTask->dataRange.range;
|
||||||
|
|
||||||
bool done = streamHistoryTaskSetVerRangeStep2(pTask, nextProcessedVer);
|
bool done = streamHistoryTaskSetVerRangeStep2(pTask, nextProcessedVer);
|
||||||
pTask->execInfo.step2Start = taosGetTimestampMs();
|
pTask->execInfo.step2Start = taosGetTimestampMs();
|
||||||
|
@ -1076,7 +1079,7 @@ static void doStartFillhistoryStep2(SStreamTask* pTask, SStreamTask* pStreamTask
|
||||||
} else {
|
} else {
|
||||||
STimeWindow* pWindow = &pTask->dataRange.window;
|
STimeWindow* pWindow = &pTask->dataRange.window;
|
||||||
tqDebug("s-task:%s level:%d verRange:%" PRId64 " - %" PRId64 " window:%" PRId64 "-%" PRId64
|
tqDebug("s-task:%s level:%d verRange:%" PRId64 " - %" PRId64 " window:%" PRId64 "-%" PRId64
|
||||||
", do secondary scan-history from WAL after halt the related stream task:%s",
|
", do secondary scan-history from WAL after halt the related stream task:%s",
|
||||||
id, pTask->info.taskLevel, pRange->minVer, pRange->maxVer, pWindow->skey, pWindow->ekey,
|
id, pTask->info.taskLevel, pRange->minVer, pRange->maxVer, pWindow->skey, pWindow->ekey,
|
||||||
pStreamTask->id.idStr);
|
pStreamTask->id.idStr);
|
||||||
ASSERT(pTask->status.schedStatus == TASK_SCHED_STATUS__WAITING);
|
ASSERT(pTask->status.schedStatus == TASK_SCHED_STATUS__WAITING);
|
||||||
|
@ -1090,7 +1093,7 @@ static void doStartFillhistoryStep2(SStreamTask* pTask, SStreamTask* pStreamTask
|
||||||
tqDebug("s-task:%s wal reader start scan WAL verRange:%" PRId64 "-%" PRId64 ", set sched-status:%d", id, dstVer,
|
tqDebug("s-task:%s wal reader start scan WAL verRange:%" PRId64 "-%" PRId64 ", set sched-status:%d", id, dstVer,
|
||||||
pTask->dataRange.range.maxVer, TASK_SCHED_STATUS__INACTIVE);
|
pTask->dataRange.range.maxVer, TASK_SCHED_STATUS__INACTIVE);
|
||||||
|
|
||||||
/*int8_t status = */streamTaskSetSchedStatusInactive(pTask);
|
/*int8_t status = */ streamTaskSetSchedStatusInactive(pTask);
|
||||||
|
|
||||||
// now the fill-history task starts to scan data from wal files.
|
// now the fill-history task starts to scan data from wal files.
|
||||||
int32_t code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_SCANHIST_DONE);
|
int32_t code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_SCANHIST_DONE);
|
||||||
|
@ -1119,7 +1122,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
streamTaskGetStatus(pTask, &pStatus);
|
streamTaskGetStatus(pTask, &pStatus);
|
||||||
|
|
||||||
// avoid multi-thread exec
|
// avoid multi-thread exec
|
||||||
while(1) {
|
while (1) {
|
||||||
int32_t sentinel = atomic_val_compare_exchange_32(&pTask->status.inScanHistorySentinel, 0, 1);
|
int32_t sentinel = atomic_val_compare_exchange_32(&pTask->status.inScanHistorySentinel, 0, 1);
|
||||||
if (sentinel != 0) {
|
if (sentinel != 0) {
|
||||||
tqDebug("s-task:%s already in scan-history func, wait for 100ms, and try again", id);
|
tqDebug("s-task:%s already in scan-history func, wait for 100ms, and try again", id);
|
||||||
|
@ -1160,7 +1163,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t st = taosGetTimestampMs();
|
int64_t st = taosGetTimestampMs();
|
||||||
SScanhistoryDataInfo retInfo = streamScanHistoryData(pTask, st);
|
SScanhistoryDataInfo retInfo = streamScanHistoryData(pTask, st);
|
||||||
|
|
||||||
double el = (taosGetTimestampMs() - st) / 1000.0;
|
double el = (taosGetTimestampMs() - st) / 1000.0;
|
||||||
|
@ -1193,13 +1196,13 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
tqDebug("s-task:%s scan-history(step 1) ended, elapsed time:%.2fs", id, pTask->execInfo.step1El);
|
tqDebug("s-task:%s scan-history(step 1) ended, elapsed time:%.2fs", id, pTask->execInfo.step1El);
|
||||||
|
|
||||||
if (pTask->info.fillHistory) {
|
if (pTask->info.fillHistory) {
|
||||||
SStreamTask* pStreamTask = NULL;
|
SStreamTask* pStreamTask = NULL;
|
||||||
|
|
||||||
// 1. get the related stream task
|
// 1. get the related stream task
|
||||||
pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.streamId, pTask->streamTaskId.taskId);
|
pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.streamId, pTask->streamTaskId.taskId);
|
||||||
if (pStreamTask == NULL) {
|
if (pStreamTask == NULL) {
|
||||||
tqError("failed to find s-task:0x%"PRIx64", it may have been destroyed, drop related fill-history task:%s",
|
tqError("failed to find s-task:0x%" PRIx64 ", it may have been destroyed, drop related fill-history task:%s",
|
||||||
pTask->streamTaskId.taskId, pTask->id.idStr);
|
pTask->streamTaskId.taskId, pTask->id.idStr);
|
||||||
|
|
||||||
tqDebug("s-task:%s fill-history task set status to be dropping", id);
|
tqDebug("s-task:%s fill-history task set status to be dropping", id);
|
||||||
streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &pTask->id);
|
streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &pTask->id);
|
||||||
|
@ -1316,7 +1319,7 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->streamId, taskId);
|
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->streamId, taskId);
|
||||||
if (pTask != NULL) { // even in halt status, the data in inputQ must be processed
|
if (pTask != NULL) { // even in halt status, the data in inputQ must be processed
|
||||||
char* p = NULL;
|
char* p = NULL;
|
||||||
if (streamTaskReadyToRun(pTask, &p)) {
|
if (streamTaskReadyToRun(pTask, &p)) {
|
||||||
tqDebug("vgId:%d s-task:%s start to process block from inputQ, next checked ver:%" PRId64, vgId, pTask->id.idStr,
|
tqDebug("vgId:%d s-task:%s start to process block from inputQ, next checked ver:%" PRId64, vgId, pTask->id.idStr,
|
||||||
|
@ -1474,7 +1477,6 @@ int32_t tqProcessTaskResumeImpl(STQ* pTq, SStreamTask* pTask, int64_t sversion,
|
||||||
int32_t level = pTask->info.taskLevel;
|
int32_t level = pTask->info.taskLevel;
|
||||||
if (level == TASK_LEVEL__SINK) {
|
if (level == TASK_LEVEL__SINK) {
|
||||||
if (status == TASK_STATUS__UNINIT) {
|
if (status == TASK_STATUS__UNINIT) {
|
||||||
|
|
||||||
}
|
}
|
||||||
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
|
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -1516,12 +1518,12 @@ int32_t tqProcessTaskResumeReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms
|
||||||
SVResumeStreamTaskReq* pReq = (SVResumeStreamTaskReq*)msg;
|
SVResumeStreamTaskReq* pReq = (SVResumeStreamTaskReq*)msg;
|
||||||
|
|
||||||
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->streamId, pReq->taskId);
|
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->streamId, pReq->taskId);
|
||||||
int32_t code = tqProcessTaskResumeImpl(pTq, pTask, sversion, pReq->igUntreated);
|
int32_t code = tqProcessTaskResumeImpl(pTq, pTask, sversion, pReq->igUntreated);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
STaskId* pHTaskId = &pTask->hTaskInfo.id;
|
STaskId* pHTaskId = &pTask->hTaskInfo.id;
|
||||||
SStreamTask* pHistoryTask = streamMetaAcquireTask(pTq->pStreamMeta, pHTaskId->streamId, pHTaskId->taskId);
|
SStreamTask* pHistoryTask = streamMetaAcquireTask(pTq->pStreamMeta, pHTaskId->streamId, pHTaskId->taskId);
|
||||||
if (pHistoryTask) {
|
if (pHistoryTask) {
|
||||||
code = tqProcessTaskResumeImpl(pTq, pHistoryTask, sversion, pReq->igUntreated);
|
code = tqProcessTaskResumeImpl(pTq, pHistoryTask, sversion, pReq->igUntreated);
|
||||||
|
@ -1541,7 +1543,7 @@ int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
tDecodeStreamRetrieveReq(&decoder, &req);
|
tDecodeStreamRetrieveReq(&decoder, &req);
|
||||||
tDecoderClear(&decoder);
|
tDecoderClear(&decoder);
|
||||||
|
|
||||||
int32_t vgId = pTq->pStreamMeta->vgId;
|
int32_t vgId = pTq->pStreamMeta->vgId;
|
||||||
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.streamId, req.dstTaskId);
|
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.streamId, req.dstTaskId);
|
||||||
if (pTask == NULL) {
|
if (pTask == NULL) {
|
||||||
tqError("vgId:%d process retrieve req, failed to acquire task:0x%x, it may have been dropped already", vgId,
|
tqError("vgId:%d process retrieve req, failed to acquire task:0x%x, it may have been dropped already", vgId,
|
||||||
|
@ -1586,10 +1588,10 @@ int32_t vnodeEnqueueStreamMsg(SVnode* pVnode, SRpcMsg* pMsg) {
|
||||||
tqDebug("vgId:%d receive dispatch msg to s-task:0x%" PRIx64 "-0x%x", vgId, req.streamId, taskId);
|
tqDebug("vgId:%d receive dispatch msg to s-task:0x%" PRIx64 "-0x%x", vgId, req.streamId, taskId);
|
||||||
|
|
||||||
// for test purpose
|
// for test purpose
|
||||||
// if (req.type == STREAM_INPUT__CHECKPOINT_TRIGGER) {
|
// if (req.type == STREAM_INPUT__CHECKPOINT_TRIGGER) {
|
||||||
// code = TSDB_CODE_STREAM_TASK_NOT_EXIST;
|
// code = TSDB_CODE_STREAM_TASK_NOT_EXIST;
|
||||||
// goto FAIL;
|
// goto FAIL;
|
||||||
// }
|
// }
|
||||||
|
|
||||||
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.streamId, taskId);
|
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.streamId, taskId);
|
||||||
if (pTask != NULL) {
|
if (pTask != NULL) {
|
||||||
|
@ -1660,7 +1662,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
|
||||||
tqDebug("vgId:%d not leader, ignore checkpoint-source msg, s-task:0x%x", vgId, req.taskId);
|
tqDebug("vgId:%d not leader, ignore checkpoint-source msg, s-task:0x%x", vgId, req.taskId);
|
||||||
SRpcMsg rsp = {0};
|
SRpcMsg rsp = {0};
|
||||||
buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, 0);
|
buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, 0);
|
||||||
tmsgSendRsp(&rsp); // error occurs
|
tmsgSendRsp(&rsp); // error occurs
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1668,7 +1670,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
|
||||||
tqDebug("vgId:%d checkpoint-source msg received during restoring, s-task:0x%x ignore it", vgId, req.taskId);
|
tqDebug("vgId:%d checkpoint-source msg received during restoring, s-task:0x%x ignore it", vgId, req.taskId);
|
||||||
SRpcMsg rsp = {0};
|
SRpcMsg rsp = {0};
|
||||||
buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, 0);
|
buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, 0);
|
||||||
tmsgSendRsp(&rsp); // error occurs
|
tmsgSendRsp(&rsp); // error occurs
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1680,7 +1682,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
|
||||||
tqError("vgId:%d failed to decode checkpoint-source msg, code:%s", vgId, tstrerror(code));
|
tqError("vgId:%d failed to decode checkpoint-source msg, code:%s", vgId, tstrerror(code));
|
||||||
SRpcMsg rsp = {0};
|
SRpcMsg rsp = {0};
|
||||||
buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, 0);
|
buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, 0);
|
||||||
tmsgSendRsp(&rsp); // error occurs
|
tmsgSendRsp(&rsp); // error occurs
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
tDecoderClear(&decoder);
|
tDecoderClear(&decoder);
|
||||||
|
@ -1691,22 +1693,23 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
|
||||||
req.taskId);
|
req.taskId);
|
||||||
SRpcMsg rsp = {0};
|
SRpcMsg rsp = {0};
|
||||||
buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, 0);
|
buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, 0);
|
||||||
tmsgSendRsp(&rsp); // error occurs
|
tmsgSendRsp(&rsp); // error occurs
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
// downstream not ready, current the stream tasks are not all ready. Ignore this checkpoint req.
|
// downstream not ready, current the stream tasks are not all ready. Ignore this checkpoint req.
|
||||||
if (pTask->status.downstreamReady != 1) {
|
if (pTask->status.downstreamReady != 1) {
|
||||||
pTask->chkInfo.failedId = req.checkpointId; // record the latest failed checkpoint id
|
pTask->chkInfo.failedId = req.checkpointId; // record the latest failed checkpoint id
|
||||||
pTask->checkpointingId = req.checkpointId;
|
pTask->checkpointingId = req.checkpointId;
|
||||||
|
|
||||||
qError("s-task:%s not ready for checkpoint, since downstream not ready, ignore this checkpoint:%" PRId64
|
qError("s-task:%s not ready for checkpoint, since downstream not ready, ignore this checkpoint:%" PRId64
|
||||||
", set it failure", pTask->id.idStr, req.checkpointId);
|
", set it failure",
|
||||||
|
pTask->id.idStr, req.checkpointId);
|
||||||
streamMetaReleaseTask(pMeta, pTask);
|
streamMetaReleaseTask(pMeta, pTask);
|
||||||
|
|
||||||
SRpcMsg rsp = {0};
|
SRpcMsg rsp = {0};
|
||||||
buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, 0);
|
buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, 0);
|
||||||
tmsgSendRsp(&rsp); // error occurs
|
tmsgSendRsp(&rsp); // error occurs
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1716,14 +1719,14 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
|
||||||
|
|
||||||
if (status == TASK_STATUS__HALT || status == TASK_STATUS__PAUSE) {
|
if (status == TASK_STATUS__HALT || status == TASK_STATUS__PAUSE) {
|
||||||
tqError("s-task:%s not ready for checkpoint, since it is halt, ignore this checkpoint:%" PRId64 ", set it failure",
|
tqError("s-task:%s not ready for checkpoint, since it is halt, ignore this checkpoint:%" PRId64 ", set it failure",
|
||||||
pTask->id.idStr, req.checkpointId);
|
pTask->id.idStr, req.checkpointId);
|
||||||
|
|
||||||
taosThreadMutexUnlock(&pTask->lock);
|
taosThreadMutexUnlock(&pTask->lock);
|
||||||
streamMetaReleaseTask(pMeta, pTask);
|
streamMetaReleaseTask(pMeta, pTask);
|
||||||
|
|
||||||
SRpcMsg rsp = {0};
|
SRpcMsg rsp = {0};
|
||||||
buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, 0);
|
buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, 0);
|
||||||
tmsgSendRsp(&rsp); // error occurs
|
tmsgSendRsp(&rsp); // error occurs
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
@ -1757,13 +1760,13 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
|
||||||
streamMetaWUnLock(pMeta);
|
streamMetaWUnLock(pMeta);
|
||||||
|
|
||||||
qInfo("s-task:%s (vgId:%d) level:%d receive checkpoint-source msg chkpt:%" PRId64 ", total checkpoint reqs:%d",
|
qInfo("s-task:%s (vgId:%d) level:%d receive checkpoint-source msg chkpt:%" PRId64 ", total checkpoint reqs:%d",
|
||||||
pTask->id.idStr, vgId, pTask->info.taskLevel, req.checkpointId, total);
|
pTask->id.idStr, vgId, pTask->info.taskLevel, req.checkpointId, total);
|
||||||
|
|
||||||
code = streamAddCheckpointSourceRspMsg(&req, &pMsg->info, pTask, 1);
|
code = streamAddCheckpointSourceRspMsg(&req, &pMsg->info, pTask, 1);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
SRpcMsg rsp = {0};
|
SRpcMsg rsp = {0};
|
||||||
buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, 0);
|
buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, 0);
|
||||||
tmsgSendRsp(&rsp); // error occurs
|
tmsgSendRsp(&rsp); // error occurs
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1828,7 +1831,7 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
streamMetaWLock(pMeta);
|
streamMetaWLock(pMeta);
|
||||||
|
|
||||||
// the task epset may be updated again and again, when replaying the WAL, the task may be in stop status.
|
// the task epset may be updated again and again, when replaying the WAL, the task may be in stop status.
|
||||||
STaskId id = {.streamId = req.streamId, .taskId = req.taskId};
|
STaskId id = {.streamId = req.streamId, .taskId = req.taskId};
|
||||||
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
|
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
|
||||||
if (ppTask == NULL || *ppTask == NULL) {
|
if (ppTask == NULL || *ppTask == NULL) {
|
||||||
tqError("vgId:%d failed to acquire task:0x%x when handling update, it may have been dropped already", pMeta->vgId,
|
tqError("vgId:%d failed to acquire task:0x%x when handling update, it may have been dropped already", pMeta->vgId,
|
||||||
|
@ -1852,7 +1855,7 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
STaskUpdateEntry entry = {.streamId = req.streamId, .taskId = req.taskId, .transId = req.transId};
|
STaskUpdateEntry entry = {.streamId = req.streamId, .taskId = req.taskId, .transId = req.transId};
|
||||||
void* exist = taosHashGet(pMeta->updateInfo.pTasks, &entry, sizeof(STaskUpdateEntry));
|
void* exist = taosHashGet(pMeta->updateInfo.pTasks, &entry, sizeof(STaskUpdateEntry));
|
||||||
if (exist != NULL) {
|
if (exist != NULL) {
|
||||||
tqDebug("s-task:%s (vgId:%d) already update in trans:%d, discard the nodeEp update msg", pTask->id.idStr, vgId,
|
tqDebug("s-task:%s (vgId:%d) already update in trans:%d, discard the nodeEp update msg", pTask->id.idStr, vgId,
|
||||||
req.transId);
|
req.transId);
|
||||||
|
@ -1922,7 +1925,8 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
streamMetaWUnLock(pMeta);
|
streamMetaWUnLock(pMeta);
|
||||||
} else {
|
} else {
|
||||||
if (!pTq->pVnode->restored) {
|
if (!pTq->pVnode->restored) {
|
||||||
tqDebug("vgId:%d vnode restore not completed, not restart the tasks, clear the start after nodeUpdate flag", vgId);
|
tqDebug("vgId:%d vnode restore not completed, not restart the tasks, clear the start after nodeUpdate flag",
|
||||||
|
vgId);
|
||||||
pMeta->startInfo.tasksWillRestart = 0;
|
pMeta->startInfo.tasksWillRestart = 0;
|
||||||
streamMetaWUnLock(pMeta);
|
streamMetaWUnLock(pMeta);
|
||||||
} else {
|
} else {
|
||||||
|
@ -1938,7 +1942,7 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
// the following procedure consume many CPU resource, result in the re-election of leader
|
// the following procedure consume many CPU resource, result in the re-election of leader
|
||||||
// with high probability. So we employ it as a test case for the stream processing framework, with
|
// with high probability. So we employ it as a test case for the stream processing framework, with
|
||||||
// checkpoint/restart/nodeUpdate etc.
|
// checkpoint/restart/nodeUpdate etc.
|
||||||
while(1) {
|
while (1) {
|
||||||
int32_t startVal = atomic_val_compare_exchange_32(&pMeta->startInfo.taskStarting, 0, 1);
|
int32_t startVal = atomic_val_compare_exchange_32(&pMeta->startInfo.taskStarting, 0, 1);
|
||||||
if (startVal == 0) {
|
if (startVal == 0) {
|
||||||
break;
|
break;
|
||||||
|
@ -1989,13 +1993,13 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tqProcessTaskResetReq(STQ* pTq, SRpcMsg* pMsg) {
|
int32_t tqProcessTaskResetReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
SVPauseStreamTaskReq* pReq = (SVPauseStreamTaskReq*) pMsg->pCont;
|
SVPauseStreamTaskReq* pReq = (SVPauseStreamTaskReq*)pMsg->pCont;
|
||||||
|
|
||||||
SStreamMeta* pMeta = pTq->pStreamMeta;
|
SStreamMeta* pMeta = pTq->pStreamMeta;
|
||||||
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId);
|
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId);
|
||||||
if (pTask == NULL) {
|
if (pTask == NULL) {
|
||||||
tqError("vgId:%d process task-reset req, failed to acquire task:0x%x, it may have been dropped already", pMeta->vgId,
|
tqError("vgId:%d process task-reset req, failed to acquire task:0x%x, it may have been dropped already",
|
||||||
pReq->taskId);
|
pMeta->vgId, pReq->taskId);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2003,8 +2007,7 @@ int32_t tqProcessTaskResetReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
|
|
||||||
// clear flag set during do checkpoint, and open inputQ for all upstream tasks
|
// clear flag set during do checkpoint, and open inputQ for all upstream tasks
|
||||||
if (streamTaskGetStatus(pTask, NULL) == TASK_STATUS__CK) {
|
if (streamTaskGetStatus(pTask, NULL) == TASK_STATUS__CK) {
|
||||||
streamTaskClearCheckInfo(pTask);
|
streamTaskClearCheckInfo(pTask, true);
|
||||||
taosArrayClear(pTask->pReadyMsgList);
|
|
||||||
streamTaskSetStatusReady(pTask);
|
streamTaskSetStatusReady(pTask);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -18,9 +18,9 @@
|
||||||
|
|
||||||
#include "executor.h"
|
#include "executor.h"
|
||||||
#include "query.h"
|
#include "query.h"
|
||||||
#include "tstream.h"
|
|
||||||
#include "streamBackendRocksdb.h"
|
#include "streamBackendRocksdb.h"
|
||||||
#include "trpc.h"
|
#include "trpc.h"
|
||||||
|
#include "tstream.h"
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
extern "C" {
|
extern "C" {
|
||||||
|
@ -32,13 +32,13 @@ extern "C" {
|
||||||
#define MAX_RETRY_LAUNCH_HISTORY_TASK 40
|
#define MAX_RETRY_LAUNCH_HISTORY_TASK 40
|
||||||
#define RETRY_LAUNCH_INTERVAL_INC_RATE 1.2
|
#define RETRY_LAUNCH_INTERVAL_INC_RATE 1.2
|
||||||
|
|
||||||
#define MAX_BLOCK_NAME_NUM 1024
|
#define MAX_BLOCK_NAME_NUM 1024
|
||||||
#define DISPATCH_RETRY_INTERVAL_MS 300
|
#define DISPATCH_RETRY_INTERVAL_MS 300
|
||||||
#define MAX_CONTINUE_RETRY_COUNT 5
|
#define MAX_CONTINUE_RETRY_COUNT 5
|
||||||
|
|
||||||
#define META_HB_CHECK_INTERVAL 200
|
#define META_HB_CHECK_INTERVAL 200
|
||||||
#define META_HB_SEND_IDLE_COUNTER 25 // send hb every 5 sec
|
#define META_HB_SEND_IDLE_COUNTER 25 // send hb every 5 sec
|
||||||
#define STREAM_TASK_KEY_LEN ((sizeof(int64_t)) << 1)
|
#define STREAM_TASK_KEY_LEN ((sizeof(int64_t)) << 1)
|
||||||
|
|
||||||
#define STREAM_TASK_QUEUE_CAPACITY 20480
|
#define STREAM_TASK_QUEUE_CAPACITY 20480
|
||||||
#define STREAM_TASK_QUEUE_CAPACITY_IN_SIZE (30)
|
#define STREAM_TASK_QUEUE_CAPACITY_IN_SIZE (30)
|
||||||
|
@ -87,13 +87,13 @@ struct SStreamQueue {
|
||||||
};
|
};
|
||||||
|
|
||||||
extern SStreamGlobalEnv streamEnv;
|
extern SStreamGlobalEnv streamEnv;
|
||||||
extern int32_t streamBackendId;
|
extern int32_t streamBackendId;
|
||||||
extern int32_t streamBackendCfWrapperId;
|
extern int32_t streamBackendCfWrapperId;
|
||||||
|
|
||||||
void streamRetryDispatchData(SStreamTask* pTask, int64_t waitDuration);
|
void streamRetryDispatchData(SStreamTask* pTask, int64_t waitDuration);
|
||||||
int32_t streamDispatchStreamBlock(SStreamTask* pTask);
|
int32_t streamDispatchStreamBlock(SStreamTask* pTask);
|
||||||
void destroyDispatchMsg(SStreamDispatchReq* pReq, int32_t numOfVgroups);
|
void destroyDispatchMsg(SStreamDispatchReq* pReq, int32_t numOfVgroups);
|
||||||
int32_t getNumOfDispatchBranch(SStreamTask* pTask);
|
int32_t getNumOfDispatchBranch(SStreamTask* pTask);
|
||||||
|
|
||||||
int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBlock);
|
int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBlock);
|
||||||
SStreamDataBlock* createStreamBlockFromDispatchMsg(const SStreamDispatchReq* pReq, int32_t blockType, int32_t srcVg);
|
SStreamDataBlock* createStreamBlockFromDispatchMsg(const SStreamDispatchReq* pReq, int32_t blockType, int32_t srcVg);
|
||||||
|
@ -114,19 +114,23 @@ int32_t streamTaskSendCheckpointReadyMsg(SStreamTask* pTask);
|
||||||
int32_t streamTaskSendCheckpointSourceRsp(SStreamTask* pTask);
|
int32_t streamTaskSendCheckpointSourceRsp(SStreamTask* pTask);
|
||||||
int32_t streamTaskGetNumOfDownstream(const SStreamTask* pTask);
|
int32_t streamTaskGetNumOfDownstream(const SStreamTask* pTask);
|
||||||
|
|
||||||
int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks, int32_t* blockSize);
|
int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks,
|
||||||
|
int32_t* blockSize);
|
||||||
int32_t streamQueueItemGetSize(const SStreamQueueItem* pItem);
|
int32_t streamQueueItemGetSize(const SStreamQueueItem* pItem);
|
||||||
void streamQueueItemIncSize(const SStreamQueueItem* pItem, int32_t size);
|
void streamQueueItemIncSize(const SStreamQueueItem* pItem, int32_t size);
|
||||||
const char* streamQueueItemGetTypeStr(int32_t type);
|
const char* streamQueueItemGetTypeStr(int32_t type);
|
||||||
|
|
||||||
SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* pElem);
|
SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* pElem);
|
||||||
|
|
||||||
int32_t streamTaskBuildScanhistoryRspMsg(SStreamTask* pTask, SStreamScanHistoryFinishReq* pReq, void** pBuffer, int32_t* pLen);
|
int32_t streamTaskBuildScanhistoryRspMsg(SStreamTask* pTask, SStreamScanHistoryFinishReq* pReq, void** pBuffer,
|
||||||
|
int32_t* pLen);
|
||||||
int32_t streamAddEndScanHistoryMsg(SStreamTask* pTask, SRpcHandleInfo* pRpcInfo, SStreamScanHistoryFinishReq* pReq);
|
int32_t streamAddEndScanHistoryMsg(SStreamTask* pTask, SRpcHandleInfo* pRpcInfo, SStreamScanHistoryFinishReq* pReq);
|
||||||
int32_t streamNotifyUpstreamContinue(SStreamTask* pTask);
|
int32_t streamNotifyUpstreamContinue(SStreamTask* pTask);
|
||||||
int32_t streamTaskFillHistoryFinished(SStreamTask* pTask);
|
int32_t streamTaskFillHistoryFinished(SStreamTask* pTask);
|
||||||
int32_t streamTransferStateToStreamTask(SStreamTask* pTask);
|
int32_t streamTransferStateToStreamTask(SStreamTask* pTask);
|
||||||
|
|
||||||
|
void streamClearChkptReadyMsg(SStreamTask* pTask);
|
||||||
|
|
||||||
int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t numCap, int32_t numRate, float quotaRate, const char*);
|
int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t numCap, int32_t numRate, float quotaRate, const char*);
|
||||||
STaskId streamTaskExtractKey(const SStreamTask* pTask);
|
STaskId streamTaskExtractKey(const SStreamTask* pTask);
|
||||||
void streamTaskInitForLaunchHTask(SHistoryTaskInfo* pInfo);
|
void streamTaskInitForLaunchHTask(SHistoryTaskInfo* pInfo);
|
||||||
|
@ -140,17 +144,17 @@ void* streamQueueNextItem(SStreamQueue* pQueue);
|
||||||
void streamFreeQitem(SStreamQueueItem* data);
|
void streamFreeQitem(SStreamQueueItem* data);
|
||||||
int32_t streamQueueGetItemSize(const SStreamQueue* pQueue);
|
int32_t streamQueueGetItemSize(const SStreamQueue* pQueue);
|
||||||
|
|
||||||
typedef enum UPLOAD_TYPE{
|
typedef enum UPLOAD_TYPE {
|
||||||
UPLOAD_DISABLE = -1,
|
UPLOAD_DISABLE = -1,
|
||||||
UPLOAD_S3 = 0,
|
UPLOAD_S3 = 0,
|
||||||
UPLOAD_RSYNC = 1,
|
UPLOAD_RSYNC = 1,
|
||||||
} UPLOAD_TYPE;
|
} UPLOAD_TYPE;
|
||||||
|
|
||||||
UPLOAD_TYPE getUploadType();
|
UPLOAD_TYPE getUploadType();
|
||||||
int uploadCheckpoint(char* id, char* path);
|
int uploadCheckpoint(char* id, char* path);
|
||||||
int downloadCheckpoint(char* id, char* path);
|
int downloadCheckpoint(char* id, char* path);
|
||||||
int deleteCheckpoint(char* id);
|
int deleteCheckpoint(char* id);
|
||||||
int deleteCheckpointFile(char* id, char* name);
|
int deleteCheckpointFile(char* id, char* name);
|
||||||
|
|
||||||
int32_t onNormalTaskReady(SStreamTask* pTask);
|
int32_t onNormalTaskReady(SStreamTask* pTask);
|
||||||
int32_t onScanhistoryTaskReady(SStreamTask* pTask);
|
int32_t onScanhistoryTaskReady(SStreamTask* pTask);
|
||||||
|
|
|
@ -13,9 +13,9 @@
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "streamInt.h"
|
|
||||||
#include "rsync.h"
|
|
||||||
#include "cos.h"
|
#include "cos.h"
|
||||||
|
#include "rsync.h"
|
||||||
|
#include "streamInt.h"
|
||||||
|
|
||||||
int32_t tEncodeStreamCheckpointSourceReq(SEncoder* pEncoder, const SStreamCheckpointSourceReq* pReq) {
|
int32_t tEncodeStreamCheckpointSourceReq(SEncoder* pEncoder, const SStreamCheckpointSourceReq* pReq) {
|
||||||
if (tStartEncode(pEncoder) < 0) return -1;
|
if (tStartEncode(pEncoder) < 0) return -1;
|
||||||
|
@ -122,7 +122,7 @@ static int32_t appendCheckpointIntoInputQ(SStreamTask* pTask, int32_t checkpoint
|
||||||
pBlock->info.rows = 1;
|
pBlock->info.rows = 1;
|
||||||
pBlock->info.childId = pTask->info.selfChildId;
|
pBlock->info.childId = pTask->info.selfChildId;
|
||||||
|
|
||||||
pChkpoint->blocks = taosArrayInit(4, sizeof(SSDataBlock));//pBlock;
|
pChkpoint->blocks = taosArrayInit(4, sizeof(SSDataBlock)); // pBlock;
|
||||||
taosArrayPush(pChkpoint->blocks, pBlock);
|
taosArrayPush(pChkpoint->blocks, pBlock);
|
||||||
|
|
||||||
taosMemoryFree(pBlock);
|
taosMemoryFree(pBlock);
|
||||||
|
@ -170,10 +170,10 @@ static int32_t continueDispatchCheckpointBlock(SStreamDataBlock* pBlock, SStream
|
||||||
|
|
||||||
int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBlock) {
|
int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBlock) {
|
||||||
SSDataBlock* pDataBlock = taosArrayGet(pBlock->blocks, 0);
|
SSDataBlock* pDataBlock = taosArrayGet(pBlock->blocks, 0);
|
||||||
int64_t checkpointId = pDataBlock->info.version;
|
int64_t checkpointId = pDataBlock->info.version;
|
||||||
|
|
||||||
const char* id = pTask->id.idStr;
|
const char* id = pTask->id.idStr;
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
// set task status
|
// set task status
|
||||||
if (streamTaskGetStatus(pTask, NULL) != TASK_STATUS__CK) {
|
if (streamTaskGetStatus(pTask, NULL) != TASK_STATUS__CK) {
|
||||||
|
@ -185,7 +185,7 @@ int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBloc
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
{ // todo: remove this when the pipeline checkpoint generating is used.
|
{ // todo: remove this when the pipeline checkpoint generating is used.
|
||||||
SStreamMeta* pMeta = pTask->pMeta;
|
SStreamMeta* pMeta = pTask->pMeta;
|
||||||
streamMetaWLock(pMeta);
|
streamMetaWLock(pMeta);
|
||||||
|
|
||||||
|
@ -196,10 +196,11 @@ int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBloc
|
||||||
streamMetaWUnLock(pMeta);
|
streamMetaWUnLock(pMeta);
|
||||||
}
|
}
|
||||||
|
|
||||||
//todo fix race condition: set the status and append checkpoint block
|
// todo fix race condition: set the status and append checkpoint block
|
||||||
int32_t taskLevel = pTask->info.taskLevel;
|
int32_t taskLevel = pTask->info.taskLevel;
|
||||||
if (taskLevel == TASK_LEVEL__SOURCE) {
|
if (taskLevel == TASK_LEVEL__SOURCE) {
|
||||||
if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH ||
|
||||||
|
pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
||||||
stDebug("s-task:%s set childIdx:%d, and add checkpoint-trigger block into outputQ", id, pTask->info.selfChildId);
|
stDebug("s-task:%s set childIdx:%d, and add checkpoint-trigger block into outputQ", id, pTask->info.selfChildId);
|
||||||
continueDispatchCheckpointBlock(pBlock, pTask);
|
continueDispatchCheckpointBlock(pBlock, pTask);
|
||||||
} else { // only one task exists, no need to dispatch downstream info
|
} else { // only one task exists, no need to dispatch downstream info
|
||||||
|
@ -222,20 +223,21 @@ int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBloc
|
||||||
int32_t num = taosArrayGetSize(pTask->upstreamInfo.pList);
|
int32_t num = taosArrayGetSize(pTask->upstreamInfo.pList);
|
||||||
if (notReady > 0) {
|
if (notReady > 0) {
|
||||||
stDebug("s-task:%s received checkpoint block, idx:%d, %d upstream tasks not send checkpoint info yet, total:%d",
|
stDebug("s-task:%s received checkpoint block, idx:%d, %d upstream tasks not send checkpoint info yet, total:%d",
|
||||||
id, pTask->info.selfChildId, notReady, num);
|
id, pTask->info.selfChildId, notReady, num);
|
||||||
streamFreeQitem((SStreamQueueItem*)pBlock);
|
streamFreeQitem((SStreamQueueItem*)pBlock);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (taskLevel == TASK_LEVEL__SINK) {
|
if (taskLevel == TASK_LEVEL__SINK) {
|
||||||
stDebug("s-task:%s process checkpoint block, all %d upstreams sent checkpoint msgs, send ready msg to upstream",
|
stDebug("s-task:%s process checkpoint block, all %d upstreams sent checkpoint msgs, send ready msg to upstream",
|
||||||
id, num);
|
id, num);
|
||||||
streamFreeQitem((SStreamQueueItem*)pBlock);
|
streamFreeQitem((SStreamQueueItem*)pBlock);
|
||||||
streamTaskBuildCheckpoint(pTask);
|
streamTaskBuildCheckpoint(pTask);
|
||||||
} else {
|
} else {
|
||||||
stDebug(
|
stDebug(
|
||||||
"s-task:%s process checkpoint block, all %d upstreams sent checkpoint msgs, dispatch checkpoint msg "
|
"s-task:%s process checkpoint block, all %d upstreams sent checkpoint msgs, dispatch checkpoint msg "
|
||||||
"downstream", id, num);
|
"downstream",
|
||||||
|
id, num);
|
||||||
|
|
||||||
// set the needed checked downstream tasks, only when all downstream tasks do checkpoint complete, this task
|
// set the needed checked downstream tasks, only when all downstream tasks do checkpoint complete, this task
|
||||||
// can start local checkpoint procedure
|
// can start local checkpoint procedure
|
||||||
|
@ -263,7 +265,7 @@ int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask) {
|
||||||
|
|
||||||
if (notReady == 0) {
|
if (notReady == 0) {
|
||||||
stDebug("s-task:%s all downstream tasks have completed the checkpoint, start to do checkpoint for current task",
|
stDebug("s-task:%s all downstream tasks have completed the checkpoint, start to do checkpoint for current task",
|
||||||
pTask->id.idStr);
|
pTask->id.idStr);
|
||||||
appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT);
|
appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT);
|
||||||
} else {
|
} else {
|
||||||
int32_t total = streamTaskGetNumOfDownstream(pTask);
|
int32_t total = streamTaskGetNumOfDownstream(pTask);
|
||||||
|
@ -273,14 +275,17 @@ int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
void streamTaskClearCheckInfo(SStreamTask* pTask) {
|
void streamTaskClearCheckInfo(SStreamTask* pTask, bool clearChkpReadyMsg) {
|
||||||
pTask->checkpointingId = 0; // clear the checkpoint id
|
pTask->checkpointingId = 0; // clear the checkpoint id
|
||||||
pTask->chkInfo.failedId = 0;
|
pTask->chkInfo.failedId = 0;
|
||||||
pTask->chkInfo.startTs = 0; // clear the recorded start time
|
pTask->chkInfo.startTs = 0; // clear the recorded start time
|
||||||
pTask->checkpointNotReadyTasks = 0;
|
pTask->checkpointNotReadyTasks = 0;
|
||||||
pTask->checkpointAlignCnt = 0;
|
pTask->checkpointAlignCnt = 0;
|
||||||
pTask->chkInfo.dispatchCheckpointTrigger = false;
|
pTask->chkInfo.dispatchCheckpointTrigger = false;
|
||||||
streamTaskOpenAllUpstreamInput(pTask); // open inputQ for all upstream tasks
|
streamTaskOpenAllUpstreamInput(pTask); // open inputQ for all upstream tasks
|
||||||
|
if (clearChkpReadyMsg) {
|
||||||
|
streamClearChkptReadyMsg(pTask);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamSaveAllTaskStatus(SStreamMeta* pMeta, int64_t checkpointId) {
|
int32_t streamSaveAllTaskStatus(SStreamMeta* pMeta, int64_t checkpointId) {
|
||||||
|
@ -290,7 +295,7 @@ int32_t streamSaveAllTaskStatus(SStreamMeta* pMeta, int64_t checkpointId) {
|
||||||
streamMetaWLock(pMeta);
|
streamMetaWLock(pMeta);
|
||||||
|
|
||||||
for (int32_t i = 0; i < taosArrayGetSize(pMeta->pTaskList); ++i) {
|
for (int32_t i = 0; i < taosArrayGetSize(pMeta->pTaskList); ++i) {
|
||||||
STaskId* pId = taosArrayGet(pMeta->pTaskList, i);
|
STaskId* pId = taosArrayGet(pMeta->pTaskList, i);
|
||||||
SStreamTask** ppTask = taosHashGet(pMeta->pTasksMap, pId, sizeof(*pId));
|
SStreamTask** ppTask = taosHashGet(pMeta->pTasksMap, pId, sizeof(*pId));
|
||||||
if (ppTask == NULL) {
|
if (ppTask == NULL) {
|
||||||
continue;
|
continue;
|
||||||
|
@ -307,7 +312,7 @@ int32_t streamSaveAllTaskStatus(SStreamMeta* pMeta, int64_t checkpointId) {
|
||||||
p->chkInfo.checkpointId = p->checkpointingId;
|
p->chkInfo.checkpointId = p->checkpointingId;
|
||||||
p->chkInfo.checkpointVer = p->chkInfo.processedVer;
|
p->chkInfo.checkpointVer = p->chkInfo.processedVer;
|
||||||
|
|
||||||
streamTaskClearCheckInfo(p);
|
streamTaskClearCheckInfo(p, false);
|
||||||
|
|
||||||
char* str = NULL;
|
char* str = NULL;
|
||||||
streamTaskGetStatus(p, &str);
|
streamTaskGetStatus(p, &str);
|
||||||
|
@ -317,7 +322,7 @@ int32_t streamSaveAllTaskStatus(SStreamMeta* pMeta, int64_t checkpointId) {
|
||||||
stDebug("s-task:%s vgId:%d save task status failed, since handle event failed", p->id.idStr, vgId);
|
stDebug("s-task:%s vgId:%d save task status failed, since handle event failed", p->id.idStr, vgId);
|
||||||
streamMetaWUnLock(pMeta);
|
streamMetaWUnLock(pMeta);
|
||||||
return -1;
|
return -1;
|
||||||
} else { // save the task
|
} else { // save the task
|
||||||
streamMetaSaveTask(pMeta, p);
|
streamMetaSaveTask(pMeta, p);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -374,33 +379,32 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) {
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
// todo: let's retry send rsp to upstream/mnode
|
// todo: let's retry send rsp to upstream/mnode
|
||||||
stError("s-task:%s failed to send checkpoint rsp to upstream, checkpointId:%" PRId64 ", code:%s", pTask->id.idStr,
|
stError("s-task:%s failed to send checkpoint rsp to upstream, checkpointId:%" PRId64 ", code:%s", pTask->id.idStr,
|
||||||
pTask->checkpointingId, tstrerror(code));
|
pTask->checkpointingId, tstrerror(code));
|
||||||
}
|
}
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int uploadCheckpointToS3(char* id, char* path){
|
static int uploadCheckpointToS3(char* id, char* path) {
|
||||||
TdDirPtr pDir = taosOpenDir(path);
|
TdDirPtr pDir = taosOpenDir(path);
|
||||||
if (pDir == NULL) return -1;
|
if (pDir == NULL) return -1;
|
||||||
|
|
||||||
TdDirEntryPtr de = NULL;
|
TdDirEntryPtr de = NULL;
|
||||||
while ((de = taosReadDir(pDir)) != NULL) {
|
while ((de = taosReadDir(pDir)) != NULL) {
|
||||||
char* name = taosGetDirEntryName(de);
|
char* name = taosGetDirEntryName(de);
|
||||||
if (strcmp(name, ".") == 0 || strcmp(name, "..") == 0 ||
|
if (strcmp(name, ".") == 0 || strcmp(name, "..") == 0 || taosDirEntryIsDir(de)) continue;
|
||||||
taosDirEntryIsDir(de)) continue;
|
|
||||||
|
|
||||||
char filename[PATH_MAX] = {0};
|
char filename[PATH_MAX] = {0};
|
||||||
if(path[strlen(path) - 1] == TD_DIRSEP_CHAR){
|
if (path[strlen(path) - 1] == TD_DIRSEP_CHAR) {
|
||||||
snprintf(filename, sizeof(filename), "%s%s", path, name);
|
snprintf(filename, sizeof(filename), "%s%s", path, name);
|
||||||
}else{
|
} else {
|
||||||
snprintf(filename, sizeof(filename), "%s%s%s", path, TD_DIRSEP, name);
|
snprintf(filename, sizeof(filename), "%s%s%s", path, TD_DIRSEP, name);
|
||||||
}
|
}
|
||||||
|
|
||||||
char object[PATH_MAX] = {0};
|
char object[PATH_MAX] = {0};
|
||||||
snprintf(object, sizeof(object), "%s%s%s", id, TD_DIRSEP, name);
|
snprintf(object, sizeof(object), "%s%s%s", id, TD_DIRSEP, name);
|
||||||
|
|
||||||
if(s3PutObjectFromFile2(filename, object) != 0){
|
if (s3PutObjectFromFile2(filename, object) != 0) {
|
||||||
taosCloseDir(&pDir);
|
taosCloseDir(&pDir);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -411,59 +415,59 @@ static int uploadCheckpointToS3(char* id, char* path){
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
UPLOAD_TYPE getUploadType(){
|
UPLOAD_TYPE getUploadType() {
|
||||||
if(strlen(tsSnodeAddress) != 0){
|
if (strlen(tsSnodeAddress) != 0) {
|
||||||
return UPLOAD_RSYNC;
|
return UPLOAD_RSYNC;
|
||||||
}else if(tsS3StreamEnabled){
|
} else if (tsS3StreamEnabled) {
|
||||||
return UPLOAD_S3;
|
return UPLOAD_S3;
|
||||||
}else{
|
} else {
|
||||||
return UPLOAD_DISABLE;
|
return UPLOAD_DISABLE;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int uploadCheckpoint(char* id, char* path){
|
int uploadCheckpoint(char* id, char* path) {
|
||||||
if(id == NULL || path == NULL || strlen(id) == 0 || strlen(path) == 0 || strlen(path) >= PATH_MAX){
|
if (id == NULL || path == NULL || strlen(id) == 0 || strlen(path) == 0 || strlen(path) >= PATH_MAX) {
|
||||||
stError("uploadCheckpoint parameters invalid");
|
stError("uploadCheckpoint parameters invalid");
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
if(strlen(tsSnodeAddress) != 0){
|
if (strlen(tsSnodeAddress) != 0) {
|
||||||
return uploadRsync(id, path);
|
return uploadRsync(id, path);
|
||||||
}else if(tsS3StreamEnabled){
|
} else if (tsS3StreamEnabled) {
|
||||||
return uploadCheckpointToS3(id, path);
|
return uploadCheckpointToS3(id, path);
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int downloadCheckpoint(char* id, char* path){
|
int downloadCheckpoint(char* id, char* path) {
|
||||||
if(id == NULL || path == NULL || strlen(id) == 0 || strlen(path) == 0 || strlen(path) >= PATH_MAX){
|
if (id == NULL || path == NULL || strlen(id) == 0 || strlen(path) == 0 || strlen(path) >= PATH_MAX) {
|
||||||
stError("downloadCheckpoint parameters invalid");
|
stError("downloadCheckpoint parameters invalid");
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
if(strlen(tsSnodeAddress) != 0){
|
if (strlen(tsSnodeAddress) != 0) {
|
||||||
return downloadRsync(id, path);
|
return downloadRsync(id, path);
|
||||||
}else if(tsS3StreamEnabled){
|
} else if (tsS3StreamEnabled) {
|
||||||
return s3GetObjectsByPrefix(id, path);
|
return s3GetObjectsByPrefix(id, path);
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int deleteCheckpoint(char* id){
|
int deleteCheckpoint(char* id) {
|
||||||
if(id == NULL || strlen(id) == 0){
|
if (id == NULL || strlen(id) == 0) {
|
||||||
stError("deleteCheckpoint parameters invalid");
|
stError("deleteCheckpoint parameters invalid");
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
if(strlen(tsSnodeAddress) != 0){
|
if (strlen(tsSnodeAddress) != 0) {
|
||||||
return deleteRsync(id);
|
return deleteRsync(id);
|
||||||
}else if(tsS3StreamEnabled){
|
} else if (tsS3StreamEnabled) {
|
||||||
s3DeleteObjectsByPrefix(id);
|
s3DeleteObjectsByPrefix(id);
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int deleteCheckpointFile(char* id, char* name){
|
int deleteCheckpointFile(char* id, char* name) {
|
||||||
char object[128] = {0};
|
char object[128] = {0};
|
||||||
snprintf(object, sizeof(object), "%s/%s", id, name);
|
snprintf(object, sizeof(object), "%s/%s", id, name);
|
||||||
char *tmp = object;
|
char* tmp = object;
|
||||||
s3DeleteObjects((const char**)&tmp, 1);
|
s3DeleteObjects((const char**)&tmp, 1);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
|
@ -14,9 +14,9 @@
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "streamInt.h"
|
#include "streamInt.h"
|
||||||
|
#include "tmisce.h"
|
||||||
#include "trpc.h"
|
#include "trpc.h"
|
||||||
#include "ttimer.h"
|
#include "ttimer.h"
|
||||||
#include "tmisce.h"
|
|
||||||
|
|
||||||
typedef struct SBlockName {
|
typedef struct SBlockName {
|
||||||
uint32_t hashValue;
|
uint32_t hashValue;
|
||||||
|
@ -231,7 +231,7 @@ int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock)
|
||||||
|
|
||||||
buf = NULL;
|
buf = NULL;
|
||||||
stDebug("s-task:%s (child %d) send retrieve req to task:0x%x (vgId:%d), reqId:0x%" PRIx64, pTask->id.idStr,
|
stDebug("s-task:%s (child %d) send retrieve req to task:0x%x (vgId:%d), reqId:0x%" PRIx64, pTask->id.idStr,
|
||||||
pTask->info.selfChildId, pEpInfo->taskId, pEpInfo->nodeId, req.reqId);
|
pTask->info.selfChildId, pEpInfo->taskId, pEpInfo->nodeId, req.reqId);
|
||||||
}
|
}
|
||||||
code = 0;
|
code = 0;
|
||||||
|
|
||||||
|
@ -270,7 +270,7 @@ int32_t streamSendCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pReq,
|
||||||
|
|
||||||
initRpcMsg(&msg, TDMT_VND_STREAM_TASK_CHECK, buf, tlen + sizeof(SMsgHead));
|
initRpcMsg(&msg, TDMT_VND_STREAM_TASK_CHECK, buf, tlen + sizeof(SMsgHead));
|
||||||
stDebug("s-task:%s (level:%d) send check msg to s-task:0x%" PRIx64 ":0x%x (vgId:%d)", pTask->id.idStr,
|
stDebug("s-task:%s (level:%d) send check msg to s-task:0x%" PRIx64 ":0x%x (vgId:%d)", pTask->id.idStr,
|
||||||
pTask->info.taskLevel, pReq->streamId, pReq->downstreamTaskId, nodeId);
|
pTask->info.taskLevel, pReq->streamId, pReq->downstreamTaskId, nodeId);
|
||||||
|
|
||||||
tmsgSendReq(pEpSet, &msg);
|
tmsgSendReq(pEpSet, &msg);
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -343,7 +343,8 @@ static int32_t doBuildDispatchMsg(SStreamTask* pTask, const SStreamDataBlock* pD
|
||||||
SSDataBlock* pDataBlock = taosArrayGet(pData->blocks, i);
|
SSDataBlock* pDataBlock = taosArrayGet(pData->blocks, i);
|
||||||
|
|
||||||
// TODO: do not use broadcast
|
// TODO: do not use broadcast
|
||||||
if (pDataBlock->info.type == STREAM_DELETE_RESULT || pDataBlock->info.type == STREAM_CHECKPOINT || pDataBlock->info.type == STREAM_TRANS_STATE) {
|
if (pDataBlock->info.type == STREAM_DELETE_RESULT || pDataBlock->info.type == STREAM_CHECKPOINT ||
|
||||||
|
pDataBlock->info.type == STREAM_TRANS_STATE) {
|
||||||
for (int32_t j = 0; j < numOfVgroups; j++) {
|
for (int32_t j = 0; j < numOfVgroups; j++) {
|
||||||
code = streamAddBlockIntoDispatchMsg(pDataBlock, &pReqs[j]);
|
code = streamAddBlockIntoDispatchMsg(pDataBlock, &pReqs[j]);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
|
@ -362,7 +363,7 @@ static int32_t doBuildDispatchMsg(SStreamTask* pTask, const SStreamDataBlock* pD
|
||||||
}
|
}
|
||||||
|
|
||||||
code = streamSearchAndAddBlock(pTask, pReqs, pDataBlock, numOfVgroups, pDataBlock->info.id.groupId);
|
code = streamSearchAndAddBlock(pTask, pReqs, pDataBlock, numOfVgroups, pDataBlock->info.id.groupId);
|
||||||
if(code != 0) {
|
if (code != 0) {
|
||||||
destroyDispatchMsg(pReqs, numOfVgroups);
|
destroyDispatchMsg(pReqs, numOfVgroups);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -371,13 +372,14 @@ static int32_t doBuildDispatchMsg(SStreamTask* pTask, const SStreamDataBlock* pD
|
||||||
pTask->msgInfo.pData = pReqs;
|
pTask->msgInfo.pData = pReqs;
|
||||||
}
|
}
|
||||||
|
|
||||||
stDebug("s-task:%s build dispatch msg success, msgId:%d, stage:%" PRId64, pTask->id.idStr, pTask->execInfo.dispatch, pTask->pMeta->stage);
|
stDebug("s-task:%s build dispatch msg success, msgId:%d, stage:%" PRId64, pTask->id.idStr, pTask->execInfo.dispatch,
|
||||||
|
pTask->pMeta->stage);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t sendDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pDispatchMsg) {
|
static int32_t sendDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pDispatchMsg) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
int32_t msgId = pTask->execInfo.dispatch;
|
int32_t msgId = pTask->execInfo.dispatch;
|
||||||
const char* id = pTask->id.idStr;
|
const char* id = pTask->id.idStr;
|
||||||
|
|
||||||
if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
|
if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
|
||||||
|
@ -393,8 +395,8 @@ static int32_t sendDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pDispatch
|
||||||
SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos;
|
SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos;
|
||||||
int32_t numOfVgroups = taosArrayGetSize(vgInfo);
|
int32_t numOfVgroups = taosArrayGetSize(vgInfo);
|
||||||
|
|
||||||
stDebug("s-task:%s (child taskId:%d) start to shuffle-dispatch blocks to %d vgroup(s), msgId:%d",
|
stDebug("s-task:%s (child taskId:%d) start to shuffle-dispatch blocks to %d vgroup(s), msgId:%d", id,
|
||||||
id, pTask->info.selfChildId, numOfVgroups, msgId);
|
pTask->info.selfChildId, numOfVgroups, msgId);
|
||||||
|
|
||||||
for (int32_t i = 0; i < numOfVgroups; i++) {
|
for (int32_t i = 0; i < numOfVgroups; i++) {
|
||||||
if (pDispatchMsg[i].blockNum > 0) {
|
if (pDispatchMsg[i].blockNum > 0) {
|
||||||
|
@ -409,7 +411,8 @@ static int32_t sendDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pDispatch
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
stDebug("s-task:%s complete shuffle-dispatch blocks to all %d vnodes, msgId:%d", pTask->id.idStr, numOfVgroups, msgId);
|
stDebug("s-task:%s complete shuffle-dispatch blocks to all %d vnodes, msgId:%d", pTask->id.idStr, numOfVgroups,
|
||||||
|
msgId);
|
||||||
}
|
}
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
|
@ -434,20 +437,20 @@ static void doRetryDispatchData(void* param, void* tmrId) {
|
||||||
SArray* pList = taosArrayDup(pTask->msgInfo.pRetryList, NULL);
|
SArray* pList = taosArrayDup(pTask->msgInfo.pRetryList, NULL);
|
||||||
taosArrayClear(pTask->msgInfo.pRetryList);
|
taosArrayClear(pTask->msgInfo.pRetryList);
|
||||||
|
|
||||||
SStreamDispatchReq *pReq = pTask->msgInfo.pData;
|
SStreamDispatchReq* pReq = pTask->msgInfo.pData;
|
||||||
|
|
||||||
if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
||||||
SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos;
|
SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos;
|
||||||
int32_t numOfVgroups = taosArrayGetSize(vgInfo);
|
int32_t numOfVgroups = taosArrayGetSize(vgInfo);
|
||||||
|
|
||||||
int32_t numOfFailed = taosArrayGetSize(pList);
|
int32_t numOfFailed = taosArrayGetSize(pList);
|
||||||
stDebug("s-task:%s (child taskId:%d) retry shuffle-dispatch blocks to %d vgroup(s), msgId:%d",
|
stDebug("s-task:%s (child taskId:%d) retry shuffle-dispatch blocks to %d vgroup(s), msgId:%d", id,
|
||||||
id, pTask->info.selfChildId, numOfFailed, msgId);
|
pTask->info.selfChildId, numOfFailed, msgId);
|
||||||
|
|
||||||
for (int32_t i = 0; i < numOfFailed; i++) {
|
for (int32_t i = 0; i < numOfFailed; i++) {
|
||||||
int32_t vgId = *(int32_t*) taosArrayGet(pList, i);
|
int32_t vgId = *(int32_t*)taosArrayGet(pList, i);
|
||||||
|
|
||||||
for(int32_t j = 0; j < numOfVgroups; ++j) {
|
for (int32_t j = 0; j < numOfVgroups; ++j) {
|
||||||
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, j);
|
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, j);
|
||||||
if (pVgInfo->vgId == vgId) {
|
if (pVgInfo->vgId == vgId) {
|
||||||
stDebug("s-task:%s (child taskId:%d) shuffle-dispatch blocks:%d to vgId:%d", pTask->id.idStr,
|
stDebug("s-task:%s (child taskId:%d) shuffle-dispatch blocks:%d to vgId:%d", pTask->id.idStr,
|
||||||
|
@ -461,7 +464,8 @@ static void doRetryDispatchData(void* param, void* tmrId) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
stDebug("s-task:%s complete re-try shuffle-dispatch blocks to all %d vnodes, msgId:%d", pTask->id.idStr, numOfFailed, msgId);
|
stDebug("s-task:%s complete re-try shuffle-dispatch blocks to all %d vnodes, msgId:%d", pTask->id.idStr,
|
||||||
|
numOfFailed, msgId);
|
||||||
} else {
|
} else {
|
||||||
int32_t vgId = pTask->outputInfo.fixedDispatcher.nodeId;
|
int32_t vgId = pTask->outputInfo.fixedDispatcher.nodeId;
|
||||||
SEpSet* pEpSet = &pTask->outputInfo.fixedDispatcher.epSet;
|
SEpSet* pEpSet = &pTask->outputInfo.fixedDispatcher.epSet;
|
||||||
|
@ -478,8 +482,8 @@ static void doRetryDispatchData(void* param, void* tmrId) {
|
||||||
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
if (!streamTaskShouldStop(pTask)) {
|
if (!streamTaskShouldStop(pTask)) {
|
||||||
// stDebug("s-task:%s reset the waitRspCnt to be 0 before launch retry dispatch", pTask->id.idStr);
|
// stDebug("s-task:%s reset the waitRspCnt to be 0 before launch retry dispatch", pTask->id.idStr);
|
||||||
// atomic_store_32(&pTask->outputInfo.shuffleDispatcher.waitingRspCnt, 0);
|
// atomic_store_32(&pTask->outputInfo.shuffleDispatcher.waitingRspCnt, 0);
|
||||||
if (streamTaskShouldPause(pTask)) {
|
if (streamTaskShouldPause(pTask)) {
|
||||||
streamRetryDispatchData(pTask, DISPATCH_RETRY_INTERVAL_MS * 10);
|
streamRetryDispatchData(pTask, DISPATCH_RETRY_INTERVAL_MS * 10);
|
||||||
} else {
|
} else {
|
||||||
|
@ -531,10 +535,12 @@ int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, S
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pDataBlock->info.parTbName[0]) {
|
if (pDataBlock->info.parTbName[0]) {
|
||||||
snprintf(ctbName, TSDB_TABLE_NAME_LEN, "%s.%s", pTask->outputInfo.shuffleDispatcher.dbInfo.db, pDataBlock->info.parTbName);
|
snprintf(ctbName, TSDB_TABLE_NAME_LEN, "%s.%s", pTask->outputInfo.shuffleDispatcher.dbInfo.db,
|
||||||
|
pDataBlock->info.parTbName);
|
||||||
} else {
|
} else {
|
||||||
buildCtbNameByGroupIdImpl(pTask->outputInfo.shuffleDispatcher.stbFullName, groupId, pDataBlock->info.parTbName);
|
buildCtbNameByGroupIdImpl(pTask->outputInfo.shuffleDispatcher.stbFullName, groupId, pDataBlock->info.parTbName);
|
||||||
snprintf(ctbName, TSDB_TABLE_NAME_LEN, "%s.%s", pTask->outputInfo.shuffleDispatcher.dbInfo.db, pDataBlock->info.parTbName);
|
snprintf(ctbName, TSDB_TABLE_NAME_LEN, "%s.%s", pTask->outputInfo.shuffleDispatcher.dbInfo.db,
|
||||||
|
pDataBlock->info.parTbName);
|
||||||
}
|
}
|
||||||
|
|
||||||
/*uint32_t hashValue = MurmurHash3_32(ctbName, strlen(ctbName));*/
|
/*uint32_t hashValue = MurmurHash3_32(ctbName, strlen(ctbName));*/
|
||||||
|
@ -576,13 +582,15 @@ int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, S
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamDispatchStreamBlock(SStreamTask* pTask) {
|
int32_t streamDispatchStreamBlock(SStreamTask* pTask) {
|
||||||
ASSERT((pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH));
|
ASSERT((pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH ||
|
||||||
|
pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH));
|
||||||
|
|
||||||
const char* id = pTask->id.idStr;
|
const char* id = pTask->id.idStr;
|
||||||
int32_t numOfElems = streamQueueGetNumOfItems(pTask->outputq.queue);
|
int32_t numOfElems = streamQueueGetNumOfItems(pTask->outputq.queue);
|
||||||
if (numOfElems > 0) {
|
if (numOfElems > 0) {
|
||||||
double size = SIZE_IN_MiB(taosQueueMemorySize(pTask->outputq.queue->pQueue));
|
double size = SIZE_IN_MiB(taosQueueMemorySize(pTask->outputq.queue->pQueue));
|
||||||
stDebug("s-task:%s start to dispatch intermediate block to downstream, elem in outputQ:%d, size:%.2fMiB", id, numOfElems, size);
|
stDebug("s-task:%s start to dispatch intermediate block to downstream, elem in outputQ:%d, size:%.2fMiB", id,
|
||||||
|
numOfElems, size);
|
||||||
}
|
}
|
||||||
|
|
||||||
// to make sure only one dispatch is running
|
// to make sure only one dispatch is running
|
||||||
|
@ -618,7 +626,7 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) {
|
||||||
int32_t code = doBuildDispatchMsg(pTask, pBlock);
|
int32_t code = doBuildDispatchMsg(pTask, pBlock);
|
||||||
if (code == 0) {
|
if (code == 0) {
|
||||||
destroyStreamDataBlock(pBlock);
|
destroyStreamDataBlock(pBlock);
|
||||||
} else { // todo handle build dispatch msg failed
|
} else { // todo handle build dispatch msg failed
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t retryCount = 0;
|
int32_t retryCount = 0;
|
||||||
|
@ -641,8 +649,9 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) {
|
||||||
|
|
||||||
if (++retryCount > MAX_CONTINUE_RETRY_COUNT) { // add to timer to retry
|
if (++retryCount > MAX_CONTINUE_RETRY_COUNT) { // add to timer to retry
|
||||||
int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);
|
int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);
|
||||||
stDebug("s-task:%s failed to dispatch msg to downstream for %d times, code:%s, add timer to retry in %dms, ref:%d",
|
stDebug(
|
||||||
pTask->id.idStr, retryCount, tstrerror(terrno), DISPATCH_RETRY_INTERVAL_MS, ref);
|
"s-task:%s failed to dispatch msg to downstream for %d times, code:%s, add timer to retry in %dms, ref:%d",
|
||||||
|
pTask->id.idStr, retryCount, tstrerror(terrno), DISPATCH_RETRY_INTERVAL_MS, ref);
|
||||||
|
|
||||||
streamRetryDispatchData(pTask, DISPATCH_RETRY_INTERVAL_MS);
|
streamRetryDispatchData(pTask, DISPATCH_RETRY_INTERVAL_MS);
|
||||||
break;
|
break;
|
||||||
|
@ -665,7 +674,8 @@ int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask) {
|
||||||
if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
|
if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
|
||||||
req.downstreamTaskId = pTask->outputInfo.fixedDispatcher.taskId;
|
req.downstreamTaskId = pTask->outputInfo.fixedDispatcher.taskId;
|
||||||
pTask->notReadyTasks = 1;
|
pTask->notReadyTasks = 1;
|
||||||
doDispatchScanHistoryFinishMsg(pTask, &req, pTask->outputInfo.fixedDispatcher.nodeId, &pTask->outputInfo.fixedDispatcher.epSet);
|
doDispatchScanHistoryFinishMsg(pTask, &req, pTask->outputInfo.fixedDispatcher.nodeId,
|
||||||
|
&pTask->outputInfo.fixedDispatcher.epSet);
|
||||||
} else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
} else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
||||||
SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos;
|
SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos;
|
||||||
int32_t numOfVgs = taosArrayGetSize(vgInfo);
|
int32_t numOfVgs = taosArrayGetSize(vgInfo);
|
||||||
|
@ -673,8 +683,8 @@ int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask) {
|
||||||
|
|
||||||
char* p = NULL;
|
char* p = NULL;
|
||||||
streamTaskGetStatus(pTask, &p);
|
streamTaskGetStatus(pTask, &p);
|
||||||
stDebug("s-task:%s send scan-history data complete msg to downstream (shuffle-dispatch) %d tasks, status:%s", pTask->id.idStr,
|
stDebug("s-task:%s send scan-history data complete msg to downstream (shuffle-dispatch) %d tasks, status:%s",
|
||||||
numOfVgs, p);
|
pTask->id.idStr, numOfVgs, p);
|
||||||
for (int32_t i = 0; i < numOfVgs; i++) {
|
for (int32_t i = 0; i < numOfVgs; i++) {
|
||||||
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
|
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
|
||||||
req.downstreamTaskId = pVgInfo->taskId;
|
req.downstreamTaskId = pVgInfo->taskId;
|
||||||
|
@ -698,11 +708,12 @@ int32_t streamTaskSendCheckpointReadyMsg(SStreamTask* pTask) {
|
||||||
tmsgSendReq(&pInfo->upstreamNodeEpset, &pInfo->msg);
|
tmsgSendReq(&pInfo->upstreamNodeEpset, &pInfo->msg);
|
||||||
|
|
||||||
stDebug("s-task:%s level:%d checkpoint ready msg sent to upstream:0x%x", pTask->id.idStr, pTask->info.taskLevel,
|
stDebug("s-task:%s level:%d checkpoint ready msg sent to upstream:0x%x", pTask->id.idStr, pTask->info.taskLevel,
|
||||||
pInfo->upStreamTaskId);
|
pInfo->upStreamTaskId);
|
||||||
}
|
}
|
||||||
|
|
||||||
taosArrayClear(pTask->pReadyMsgList);
|
taosArrayClear(pTask->pReadyMsgList);
|
||||||
stDebug("s-task:%s level:%d checkpoint ready msg sent to all %d upstreams", pTask->id.idStr, pTask->info.taskLevel, num);
|
stDebug("s-task:%s level:%d checkpoint ready msg sent to all %d upstreams", pTask->id.idStr, pTask->info.taskLevel,
|
||||||
|
num);
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
@ -789,7 +800,7 @@ int32_t doDispatchScanHistoryFinishMsg(SStreamTask* pTask, const SStreamScanHist
|
||||||
char* p = NULL;
|
char* p = NULL;
|
||||||
streamTaskGetStatus(pTask, &p);
|
streamTaskGetStatus(pTask, &p);
|
||||||
stDebug("s-task:%s status:%s dispatch scan-history finish msg to taskId:0x%x (vgId:%d)", pTask->id.idStr, p,
|
stDebug("s-task:%s status:%s dispatch scan-history finish msg to taskId:0x%x (vgId:%d)", pTask->id.idStr, p,
|
||||||
pReq->downstreamTaskId, vgId);
|
pReq->downstreamTaskId, vgId);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -872,8 +883,8 @@ int32_t buildCheckpointSourceRsp(SStreamCheckpointSourceReq* pReq, SRpcHandleInf
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo,
|
int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SStreamTask* pTask,
|
||||||
SStreamTask* pTask, int8_t isSucceed) {
|
int8_t isSucceed) {
|
||||||
SStreamChkptReadyInfo info = {0};
|
SStreamChkptReadyInfo info = {0};
|
||||||
buildCheckpointSourceRsp(pReq, pRpcInfo, &info.msg, isSucceed);
|
buildCheckpointSourceRsp(pReq, pRpcInfo, &info.msg, isSucceed);
|
||||||
|
|
||||||
|
@ -882,7 +893,8 @@ int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHa
|
||||||
}
|
}
|
||||||
|
|
||||||
taosArrayPush(pTask->pReadyMsgList, &info);
|
taosArrayPush(pTask->pReadyMsgList, &info);
|
||||||
stDebug("s-task:%s add checkpoint source rsp msg, total:%d", pTask->id.idStr, (int32_t)taosArrayGetSize(pTask->pReadyMsgList));
|
stDebug("s-task:%s add checkpoint source rsp msg, total:%d", pTask->id.idStr,
|
||||||
|
(int32_t)taosArrayGetSize(pTask->pReadyMsgList));
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -932,8 +944,10 @@ int32_t streamAddCheckpointReadyMsg(SStreamTask* pTask, int32_t upstreamTaskId,
|
||||||
initRpcMsg(&info.msg, TDMT_STREAM_TASK_CHECKPOINT_READY, buf, tlen + sizeof(SMsgHead));
|
initRpcMsg(&info.msg, TDMT_STREAM_TASK_CHECKPOINT_READY, buf, tlen + sizeof(SMsgHead));
|
||||||
info.msg.info.noResp = 1; // refactor later.
|
info.msg.info.noResp = 1; // refactor later.
|
||||||
|
|
||||||
stDebug("s-task:%s (level:%d) prepare checkpoint ready msg to upstream s-task:0x%" PRIx64 ":0x%x (vgId:%d) idx:%d, vgId:%d",
|
stDebug("s-task:%s (level:%d) prepare checkpoint ready msg to upstream s-task:0x%" PRIx64
|
||||||
pTask->id.idStr, pTask->info.taskLevel, req.streamId, req.upstreamTaskId, req.upstreamNodeId, index, req.upstreamNodeId);
|
":0x%x (vgId:%d) idx:%d, vgId:%d",
|
||||||
|
pTask->id.idStr, pTask->info.taskLevel, req.streamId, req.upstreamTaskId, req.upstreamNodeId, index,
|
||||||
|
req.upstreamNodeId);
|
||||||
|
|
||||||
if (pTask->pReadyMsgList == NULL) {
|
if (pTask->pReadyMsgList == NULL) {
|
||||||
pTask->pReadyMsgList = taosArrayInit(4, sizeof(SStreamChkptReadyInfo));
|
pTask->pReadyMsgList = taosArrayInit(4, sizeof(SStreamChkptReadyInfo));
|
||||||
|
@ -943,6 +957,16 @@ int32_t streamAddCheckpointReadyMsg(SStreamTask* pTask, int32_t upstreamTaskId,
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void streamClearChkptReadyMsg(SStreamTask* pTask) {
|
||||||
|
if (pTask->pReadyMsgList == NULL) return;
|
||||||
|
|
||||||
|
for (int i = 0; i < taosArrayGetSize(pTask->pReadyMsgList); i++) {
|
||||||
|
SStreamChkptReadyInfo* pInfo = taosArrayGet(pTask->pReadyMsgList, i);
|
||||||
|
rpcFreeCont(pInfo->msg.pCont);
|
||||||
|
}
|
||||||
|
taosArrayClear(pTask->pReadyMsgList);
|
||||||
|
}
|
||||||
|
|
||||||
int32_t tEncodeCompleteHistoryDataMsg(SEncoder* pEncoder, const SStreamCompleteHistoryMsg* pReq) {
|
int32_t tEncodeCompleteHistoryDataMsg(SEncoder* pEncoder, const SStreamCompleteHistoryMsg* pReq) {
|
||||||
if (tStartEncode(pEncoder) < 0) return -1;
|
if (tStartEncode(pEncoder) < 0) return -1;
|
||||||
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
|
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
|
||||||
|
@ -965,7 +989,8 @@ int32_t tDecodeCompleteHistoryDataMsg(SDecoder* pDecoder, SStreamCompleteHistory
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamTaskBuildScanhistoryRspMsg(SStreamTask* pTask, SStreamScanHistoryFinishReq* pReq, void** pBuffer, int32_t* pLen) {
|
int32_t streamTaskBuildScanhistoryRspMsg(SStreamTask* pTask, SStreamScanHistoryFinishReq* pReq, void** pBuffer,
|
||||||
|
int32_t* pLen) {
|
||||||
int32_t len = 0;
|
int32_t len = 0;
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
SEncoder encoder;
|
SEncoder encoder;
|
||||||
|
@ -1022,7 +1047,7 @@ int32_t streamAddEndScanHistoryMsg(SStreamTask* pTask, SRpcHandleInfo* pRpcInfo,
|
||||||
|
|
||||||
int32_t num = taosArrayGetSize(pTask->pRspMsgList);
|
int32_t num = taosArrayGetSize(pTask->pRspMsgList);
|
||||||
stDebug("s-task:%s add scan-history finish rsp msg for task:0x%x, total:%d", pTask->id.idStr, pReq->upstreamTaskId,
|
stDebug("s-task:%s add scan-history finish rsp msg for task:0x%x, total:%d", pTask->id.idStr, pReq->upstreamTaskId,
|
||||||
num);
|
num);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1065,7 +1090,7 @@ static int32_t handleDispatchSuccessRsp(SStreamTask* pTask, int32_t downstreamId
|
||||||
stDebug("s-task:%s downstream task:0x%x resume to normal from inputQ blocking, blocking time:%" PRId64 "ms",
|
stDebug("s-task:%s downstream task:0x%x resume to normal from inputQ blocking, blocking time:%" PRId64 "ms",
|
||||||
pTask->id.idStr, downstreamId, el);
|
pTask->id.idStr, downstreamId, el);
|
||||||
} else {
|
} else {
|
||||||
stDebug("s-task:%s dispatch completed, elapsed time:%"PRId64"ms", pTask->id.idStr, el);
|
stDebug("s-task:%s dispatch completed, elapsed time:%" PRId64 "ms", pTask->id.idStr, el);
|
||||||
}
|
}
|
||||||
|
|
||||||
// now ready for next data output
|
// now ready for next data output
|
||||||
|
@ -1084,7 +1109,7 @@ static int32_t handleDispatchSuccessRsp(SStreamTask* pTask, int32_t downstreamId
|
||||||
int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, int32_t code) {
|
int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, int32_t code) {
|
||||||
const char* id = pTask->id.idStr;
|
const char* id = pTask->id.idStr;
|
||||||
int32_t vgId = pTask->pMeta->vgId;
|
int32_t vgId = pTask->pMeta->vgId;
|
||||||
int32_t msgId = pTask->execInfo.dispatch;
|
int32_t msgId = pTask->execInfo.dispatch;
|
||||||
|
|
||||||
#if 0
|
#if 0
|
||||||
// for test purpose, build the failure case
|
// for test purpose, build the failure case
|
||||||
|
@ -1095,7 +1120,8 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
|
||||||
|
|
||||||
// follower not handle the dispatch rsp
|
// follower not handle the dispatch rsp
|
||||||
if ((pTask->pMeta->role == NODE_ROLE_FOLLOWER) || (pTask->status.downstreamReady != 1)) {
|
if ((pTask->pMeta->role == NODE_ROLE_FOLLOWER) || (pTask->status.downstreamReady != 1)) {
|
||||||
stError("s-task:%s vgId:%d is follower or task just re-launched, not handle the dispatch rsp, discard it", id, vgId);
|
stError("s-task:%s vgId:%d is follower or task just re-launched, not handle the dispatch rsp, discard it", id,
|
||||||
|
vgId);
|
||||||
return TSDB_CODE_STREAM_TASK_NOT_EXIST;
|
return TSDB_CODE_STREAM_TASK_NOT_EXIST;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1113,8 +1139,8 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
|
||||||
// flag. Here we need to retry dispatch this message to downstream task immediately. handle the case the failure
|
// flag. Here we need to retry dispatch this message to downstream task immediately. handle the case the failure
|
||||||
// happened too fast.
|
// happened too fast.
|
||||||
if (code == TSDB_CODE_STREAM_TASK_NOT_EXIST) { // destination task does not exist, not retry anymore
|
if (code == TSDB_CODE_STREAM_TASK_NOT_EXIST) { // destination task does not exist, not retry anymore
|
||||||
stError("s-task:%s failed to dispatch msg to task:0x%x(vgId:%d), msgId:%d no retry, since task destroyed already", id,
|
stError("s-task:%s failed to dispatch msg to task:0x%x(vgId:%d), msgId:%d no retry, since task destroyed already",
|
||||||
pRsp->downstreamTaskId, pRsp->downstreamNodeId, msgId);
|
id, pRsp->downstreamTaskId, pRsp->downstreamNodeId, msgId);
|
||||||
} else {
|
} else {
|
||||||
stError("s-task:%s failed to dispatch msgId:%d to task:0x%x(vgId:%d), code:%s, add to retry list", id, msgId,
|
stError("s-task:%s failed to dispatch msgId:%d to task:0x%x(vgId:%d), code:%s, add to retry list", id, msgId,
|
||||||
pRsp->downstreamTaskId, pRsp->downstreamNodeId, tstrerror(code));
|
pRsp->downstreamTaskId, pRsp->downstreamNodeId, tstrerror(code));
|
||||||
|
@ -1158,16 +1184,18 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
|
||||||
ASSERT(leftRsp >= 0);
|
ASSERT(leftRsp >= 0);
|
||||||
|
|
||||||
if (leftRsp > 0) {
|
if (leftRsp > 0) {
|
||||||
stDebug( "s-task:%s recv dispatch rsp, msgId:%d from 0x%x(vgId:%d), downstream task input status:%d code:%s, waiting for %d rsp",
|
stDebug(
|
||||||
id, msgId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->inputStatus, tstrerror(code), leftRsp);
|
"s-task:%s recv dispatch rsp, msgId:%d from 0x%x(vgId:%d), downstream task input status:%d code:%s, waiting "
|
||||||
|
"for %d rsp",
|
||||||
|
id, msgId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->inputStatus, tstrerror(code), leftRsp);
|
||||||
} else {
|
} else {
|
||||||
stDebug(
|
stDebug(
|
||||||
"s-task:%s recv dispatch rsp, msgId:%d from 0x%x(vgId:%d), downstream task input status:%d code:%s, all rsp",
|
"s-task:%s recv dispatch rsp, msgId:%d from 0x%x(vgId:%d), downstream task input status:%d code:%s, all rsp",
|
||||||
id, msgId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->inputStatus, tstrerror(code));
|
id, msgId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->inputStatus, tstrerror(code));
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
stDebug("s-task:%s recv fix-dispatch rsp, msgId:%d from 0x%x(vgId:%d), downstream task input status:%d code:%s",
|
stDebug("s-task:%s recv fix-dispatch rsp, msgId:%d from 0x%x(vgId:%d), downstream task input status:%d code:%s", id,
|
||||||
id, msgId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->inputStatus, tstrerror(code));
|
msgId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->inputStatus, tstrerror(code));
|
||||||
}
|
}
|
||||||
|
|
||||||
ASSERT(leftRsp >= 0);
|
ASSERT(leftRsp >= 0);
|
||||||
|
@ -1189,7 +1217,7 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
|
||||||
pTask->id.idStr, DISPATCH_RETRY_INTERVAL_MS, ref);
|
pTask->id.idStr, DISPATCH_RETRY_INTERVAL_MS, ref);
|
||||||
|
|
||||||
streamRetryDispatchData(pTask, DISPATCH_RETRY_INTERVAL_MS);
|
streamRetryDispatchData(pTask, DISPATCH_RETRY_INTERVAL_MS);
|
||||||
} else { // this message has been sent successfully, let's try next one.
|
} else { // this message has been sent successfully, let's try next one.
|
||||||
pTask->msgInfo.retryCount = 0;
|
pTask->msgInfo.retryCount = 0;
|
||||||
|
|
||||||
// transtate msg has been sent to downstream successfully. let's transfer the fill-history task state
|
// transtate msg has been sent to downstream successfully. let's transfer the fill-history task state
|
||||||
|
|
|
@ -15,11 +15,11 @@
|
||||||
|
|
||||||
#include "executor.h"
|
#include "executor.h"
|
||||||
#include "streamInt.h"
|
#include "streamInt.h"
|
||||||
|
#include "streamsm.h"
|
||||||
#include "tmisce.h"
|
#include "tmisce.h"
|
||||||
#include "tstream.h"
|
#include "tstream.h"
|
||||||
#include "ttimer.h"
|
#include "ttimer.h"
|
||||||
#include "wal.h"
|
#include "wal.h"
|
||||||
#include "streamsm.h"
|
|
||||||
|
|
||||||
static void streamTaskDestroyUpstreamInfo(SUpstreamInfo* pUpstreamInfo);
|
static void streamTaskDestroyUpstreamInfo(SUpstreamInfo* pUpstreamInfo);
|
||||||
|
|
||||||
|
@ -309,11 +309,11 @@ void tFreeStreamTask(SStreamTask* pTask) {
|
||||||
stDebug("start to free s-task:0x%x, %p, state:%p", taskId, pTask, pTask->pState);
|
stDebug("start to free s-task:0x%x, %p, state:%p", taskId, pTask, pTask->pState);
|
||||||
|
|
||||||
stDebug("s-task:0x%x task exec summary: create:%" PRId64 ", init:%" PRId64 ", start:%" PRId64
|
stDebug("s-task:0x%x task exec summary: create:%" PRId64 ", init:%" PRId64 ", start:%" PRId64
|
||||||
", updateCount:%d latestUpdate:%" PRId64 ", latestCheckPoint:%" PRId64 ", ver:%" PRId64
|
", updateCount:%d latestUpdate:%" PRId64 ", latestCheckPoint:%" PRId64 ", ver:%" PRId64
|
||||||
" nextProcessVer:%" PRId64", checkpointCount:%d",
|
" nextProcessVer:%" PRId64 ", checkpointCount:%d",
|
||||||
taskId, pStatis->created, pStatis->init, pStatis->start, pStatis->updateCount, pStatis->latestUpdateTs,
|
taskId, pStatis->created, pStatis->init, pStatis->start, pStatis->updateCount, pStatis->latestUpdateTs,
|
||||||
pTask->chkInfo.checkpointId, pTask->chkInfo.checkpointVer, pTask->chkInfo.nextProcessVer,
|
pTask->chkInfo.checkpointId, pTask->chkInfo.checkpointVer, pTask->chkInfo.nextProcessVer,
|
||||||
pStatis->checkpoint);
|
pStatis->checkpoint);
|
||||||
|
|
||||||
// remove the ref by timer
|
// remove the ref by timer
|
||||||
while (pTask->status.timerActive > 0) {
|
while (pTask->status.timerActive > 0) {
|
||||||
|
@ -358,7 +358,9 @@ void tFreeStreamTask(SStreamTask* pTask) {
|
||||||
walCloseReader(pTask->exec.pWalReader);
|
walCloseReader(pTask->exec.pWalReader);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
streamClearChkptReadyMsg(pTask);
|
||||||
pTask->pReadyMsgList = taosArrayDestroy(pTask->pReadyMsgList);
|
pTask->pReadyMsgList = taosArrayDestroy(pTask->pReadyMsgList);
|
||||||
|
|
||||||
if (pTask->msgInfo.pData != NULL) {
|
if (pTask->msgInfo.pData != NULL) {
|
||||||
destroyDispatchMsg(pTask->msgInfo.pData, getNumOfDispatchBranch(pTask));
|
destroyDispatchMsg(pTask->msgInfo.pData, getNumOfDispatchBranch(pTask));
|
||||||
pTask->msgInfo.pData = NULL;
|
pTask->msgInfo.pData = NULL;
|
||||||
|
@ -422,7 +424,7 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i
|
||||||
pTask->status.pSM = streamCreateStateMachine(pTask);
|
pTask->status.pSM = streamCreateStateMachine(pTask);
|
||||||
if (pTask->status.pSM == NULL) {
|
if (pTask->status.pSM == NULL) {
|
||||||
stError("s-task:%s failed create state-machine for stream task, initialization failed, code:%s", pTask->id.idStr,
|
stError("s-task:%s failed create state-machine for stream task, initialization failed, code:%s", pTask->id.idStr,
|
||||||
tstrerror(terrno));
|
tstrerror(terrno));
|
||||||
return terrno;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -434,7 +436,7 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i
|
||||||
pTask->chkInfo.checkpointVer = ver - 1; // only update when generating checkpoint
|
pTask->chkInfo.checkpointVer = ver - 1; // only update when generating checkpoint
|
||||||
pTask->chkInfo.processedVer = ver - 1; // already processed version
|
pTask->chkInfo.processedVer = ver - 1; // already processed version
|
||||||
|
|
||||||
pTask->chkInfo.nextProcessVer = ver; // next processed version
|
pTask->chkInfo.nextProcessVer = ver; // next processed version
|
||||||
pTask->dataRange.range.maxVer = ver;
|
pTask->dataRange.range.maxVer = ver;
|
||||||
pTask->dataRange.range.minVer = ver;
|
pTask->dataRange.range.minVer = ver;
|
||||||
pTask->pMsgCb = pMsgCb;
|
pTask->pMsgCb = pMsgCb;
|
||||||
|
@ -442,7 +444,8 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i
|
||||||
|
|
||||||
pTask->outputInfo.pTokenBucket = taosMemoryCalloc(1, sizeof(STokenBucket));
|
pTask->outputInfo.pTokenBucket = taosMemoryCalloc(1, sizeof(STokenBucket));
|
||||||
if (pTask->outputInfo.pTokenBucket == NULL) {
|
if (pTask->outputInfo.pTokenBucket == NULL) {
|
||||||
stError("s-task:%s failed to prepare the tokenBucket, code:%s", pTask->id.idStr, tstrerror(TSDB_CODE_OUT_OF_MEMORY));
|
stError("s-task:%s failed to prepare the tokenBucket, code:%s", pTask->id.idStr,
|
||||||
|
tstrerror(TSDB_CODE_OUT_OF_MEMORY));
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -451,7 +454,7 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i
|
||||||
streamTaskInitTokenBucket(pTask->outputInfo.pTokenBucket, 50, 50, tsSinkDataRate, pTask->id.idStr);
|
streamTaskInitTokenBucket(pTask->outputInfo.pTokenBucket, 50, 50, tsSinkDataRate, pTask->id.idStr);
|
||||||
|
|
||||||
TdThreadMutexAttr attr = {0};
|
TdThreadMutexAttr attr = {0};
|
||||||
int code = taosThreadMutexAttrInit(&attr);
|
int code = taosThreadMutexAttrInit(&attr);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
stError("s-task:%s initElapsed mutex attr failed, code:%s", pTask->id.idStr, tstrerror(code));
|
stError("s-task:%s initElapsed mutex attr failed, code:%s", pTask->id.idStr, tstrerror(code));
|
||||||
return code;
|
return code;
|
||||||
|
@ -529,8 +532,8 @@ void streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpS
|
||||||
SStreamChildEpInfo* pInfo = taosArrayGetP(pTask->upstreamInfo.pList, i);
|
SStreamChildEpInfo* pInfo = taosArrayGetP(pTask->upstreamInfo.pList, i);
|
||||||
if (pInfo->nodeId == nodeId) {
|
if (pInfo->nodeId == nodeId) {
|
||||||
epsetAssign(&pInfo->epSet, pEpSet);
|
epsetAssign(&pInfo->epSet, pEpSet);
|
||||||
stDebug("s-task:0x%x update the upstreamInfo taskId:0x%x(nodeId:%d) newEpset:%s", pTask->id.taskId,
|
stDebug("s-task:0x%x update the upstreamInfo taskId:0x%x(nodeId:%d) newEpset:%s", pTask->id.taskId, pInfo->taskId,
|
||||||
pInfo->taskId, nodeId, buf);
|
nodeId, buf);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -569,7 +572,7 @@ void streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SE
|
||||||
if (pVgInfo->vgId == nodeId) {
|
if (pVgInfo->vgId == nodeId) {
|
||||||
epsetAssign(&pVgInfo->epSet, pEpSet);
|
epsetAssign(&pVgInfo->epSet, pEpSet);
|
||||||
stDebug("s-task:0x%x update the dispatch info, task:0x%x(nodeId:%d) newEpset:%s", pTask->id.taskId,
|
stDebug("s-task:0x%x update the dispatch info, task:0x%x(nodeId:%d) newEpset:%s", pTask->id.taskId,
|
||||||
pVgInfo->taskId, nodeId, buf);
|
pVgInfo->taskId, nodeId, buf);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -578,7 +581,7 @@ void streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SE
|
||||||
if (pDispatcher->nodeId == nodeId) {
|
if (pDispatcher->nodeId == nodeId) {
|
||||||
epsetAssign(&pDispatcher->epSet, pEpSet);
|
epsetAssign(&pDispatcher->epSet, pEpSet);
|
||||||
stDebug("s-task:0x%x update the dispatch info, task:0x%x(nodeId:%d) newEpSet:%s", pTask->id.taskId,
|
stDebug("s-task:0x%x update the dispatch info, task:0x%x(nodeId:%d) newEpSet:%s", pTask->id.taskId,
|
||||||
pDispatcher->taskId, nodeId, buf);
|
pDispatcher->taskId, nodeId, buf);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// do nothing
|
// do nothing
|
||||||
|
@ -586,9 +589,9 @@ void streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SE
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamTaskStop(SStreamTask* pTask) {
|
int32_t streamTaskStop(SStreamTask* pTask) {
|
||||||
int32_t vgId = pTask->pMeta->vgId;
|
int32_t vgId = pTask->pMeta->vgId;
|
||||||
int64_t st = taosGetTimestampMs();
|
int64_t st = taosGetTimestampMs();
|
||||||
const char* id = pTask->id.idStr;
|
const char* id = pTask->id.idStr;
|
||||||
|
|
||||||
streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_STOP);
|
streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_STOP);
|
||||||
qKillTask(pTask->exec.pExecutor, TSDB_CODE_SUCCESS);
|
qKillTask(pTask->exec.pExecutor, TSDB_CODE_SUCCESS);
|
||||||
|
@ -635,7 +638,7 @@ int32_t streamTaskUpdateEpsetInfo(SStreamTask* pTask, SArray* pNodeList) {
|
||||||
p->latestUpdateTs = taosGetTimestampMs();
|
p->latestUpdateTs = taosGetTimestampMs();
|
||||||
p->updateCount += 1;
|
p->updateCount += 1;
|
||||||
stDebug("s-task:0x%x update task nodeEp epset, updatedNodes:%d, updateCount:%d, prevTs:%" PRId64, pTask->id.taskId,
|
stDebug("s-task:0x%x update task nodeEp epset, updatedNodes:%d, updateCount:%d, prevTs:%" PRId64, pTask->id.taskId,
|
||||||
numOfNodes, p->updateCount, prevTs);
|
numOfNodes, p->updateCount, prevTs);
|
||||||
|
|
||||||
for (int32_t i = 0; i < taosArrayGetSize(pNodeList); ++i) {
|
for (int32_t i = 0; i < taosArrayGetSize(pNodeList); ++i) {
|
||||||
SNodeUpdateInfo* pInfo = taosArrayGet(pNodeList, i);
|
SNodeUpdateInfo* pInfo = taosArrayGet(pNodeList, i);
|
||||||
|
@ -706,7 +709,7 @@ int32_t streamTaskClearHTaskAttr(SStreamTask* pTask) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
STaskId sTaskId = {.streamId = pTask->streamTaskId.streamId, .taskId = pTask->streamTaskId.taskId};
|
STaskId sTaskId = {.streamId = pTask->streamTaskId.streamId, .taskId = pTask->streamTaskId.taskId};
|
||||||
SStreamTask** ppStreamTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &sTaskId, sizeof(sTaskId));
|
SStreamTask** ppStreamTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &sTaskId, sizeof(sTaskId));
|
||||||
|
|
||||||
if (ppStreamTask != NULL) {
|
if (ppStreamTask != NULL) {
|
||||||
|
@ -720,7 +723,7 @@ int32_t streamTaskClearHTaskAttr(SStreamTask* pTask) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamBuildAndSendDropTaskMsg(SMsgCb* pMsgCb, int32_t vgId, SStreamTaskId* pTaskId) {
|
int32_t streamBuildAndSendDropTaskMsg(SMsgCb* pMsgCb, int32_t vgId, SStreamTaskId* pTaskId) {
|
||||||
SVDropStreamTaskReq *pReq = rpcMallocCont(sizeof(SVDropStreamTaskReq));
|
SVDropStreamTaskReq* pReq = rpcMallocCont(sizeof(SVDropStreamTaskReq));
|
||||||
if (pReq == NULL) {
|
if (pReq == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
return -1;
|
return -1;
|
||||||
|
|
Loading…
Reference in New Issue