enh(stream): support retrieve checkpoint data remotely when start stream tasks in vnodes.

This commit is contained in:
Haojun Liao 2024-05-28 09:54:22 +08:00
parent d001a87a58
commit be107b204c
23 changed files with 716 additions and 162 deletions

View File

@ -13,7 +13,7 @@ extern "C" {
void stopRsync();
void startRsync();
int32_t uploadRsync(const char* id, const char* path);
int32_t uploadByRsync(const char* id, const char* path);
int32_t downloadRsync(const char* id, const char* path);
int32_t deleteRsync(const char* id);

View File

@ -321,6 +321,7 @@
TD_DEF_MSG_TYPE(TDMT_STREAM_MAX_MSG, "stream-max", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_CREATE, "stream-create", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_DROP, "stream-drop", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_RETRIEVE_TRIGGER, "stream-retri-trigger", NULL, NULL)
TD_CLOSE_MSG_SEG(TDMT_END_STREAM_MSG)

View File

@ -36,6 +36,7 @@ int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLead
int32_t tqStartTaskCompleteCallback(SStreamMeta* pMeta);
int32_t tqStreamTasksGetTotalNum(SStreamMeta* pMeta);
int32_t tqStreamTaskProcessTaskResetReq(SStreamMeta* pMeta, SRpcMsg* pMsg);
int32_t tqStreamTaskProcessRetrieveTriggerReq(SStreamMeta* pMeta, SRpcMsg* pMsg);
int32_t tqStreamTaskProcessTaskPauseReq(SStreamMeta* pMeta, char* pMsg);
int32_t tqStreamTaskProcessTaskResumeReq(void* handle, int64_t sversion, char* pMsg, bool fromVnode);
int32_t tqStreamTaskProcessUpdateCheckpointReq(SStreamMeta* pMeta, char* msg, int32_t msgLen);

View File

@ -22,17 +22,17 @@
extern "C" {
#endif
typedef struct SStreamChildEpInfo {
typedef struct SStreamUpstreamEpInfo {
int32_t nodeId;
int32_t childId;
int32_t taskId;
SEpSet epSet;
bool dataAllowed; // denote if the data from this upstream task is allowed to put into inputQ, not serialize it
int64_t stage; // upstream task stage value, to denote if the upstream node has restart/replica changed/transfer
} SStreamChildEpInfo;
} SStreamUpstreamEpInfo;
int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo);
int32_t tDecodeStreamEpInfo(SDecoder* pDecoder, SStreamChildEpInfo* pInfo);
int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamUpstreamEpInfo* pInfo);
int32_t tDecodeStreamEpInfo(SDecoder* pDecoder, SStreamUpstreamEpInfo* pInfo);
// mndTrigger: denote if this checkpoint is triggered by mnode or as requested from tasks when transfer-state finished
typedef struct {
@ -171,6 +171,16 @@ int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pRsp);
int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pRsp);
void tCleanupStreamHbMsg(SStreamHbMsg* pMsg);
typedef struct SRetrieveChkptTriggerReq {
SMsgHead head;
int64_t streamId;
int64_t checkpointId;
int32_t upstreamNodeId;
int32_t upstreamTaskId;
int32_t downstreamNodeId;
int64_t downstreamTaskId;
} SRetrieveChkptTriggerReq;
typedef struct {
SMsgHead head;
int64_t streamId;

View File

@ -61,10 +61,11 @@ extern "C" {
// the load and start stream task should be executed after snode has started successfully, since the load of stream
// tasks may incur the download of checkpoint data from remote, which may consume significant network and CPU resources.
typedef struct SStreamTask SStreamTask;
typedef struct SStreamQueue SStreamQueue;
typedef struct SStreamTaskSM SStreamTaskSM;
typedef struct SStreamQueueItem SStreamQueueItem;
typedef struct SStreamTask SStreamTask;
typedef struct SStreamQueue SStreamQueue;
typedef struct SStreamTaskSM SStreamTaskSM;
typedef struct SStreamQueueItem SStreamQueueItem;
typedef struct SActiveCheckpointInfo SActiveCheckpointInfo;
#define SSTREAM_TASK_VER 4
#define SSTREAM_TASK_INCOMPATIBLE_VER 1
@ -270,13 +271,10 @@ typedef struct SCheckpointInfo {
int64_t checkpointTime; // latest checkpoint time
int64_t processedVer;
int64_t nextProcessVer; // current offset in WAL, not serialize it
int64_t failedId; // record the latest failed checkpoint id
int64_t checkpointingId;
int32_t downstreamAlignNum;
int32_t numOfNotReady;
bool dispatchCheckpointTrigger;
SActiveCheckpointInfo* pActiveInfo;
int64_t msgVer;
int32_t transId;
} SCheckpointInfo;
typedef struct SStreamStatus {
@ -436,7 +434,6 @@ struct SStreamTask {
SHistoryTaskInfo hTaskInfo;
STaskId streamTaskId;
STaskExecStatisInfo execInfo;
SArray* pReadyMsgList; // SArray<SStreamChkptReadyInfo*>
TdThreadMutex lock; // secure the operation of set task status and puting data into inputQ
SMsgCb* pMsgCb; // msg handle
SStreamState* pState; // state backend
@ -619,7 +616,8 @@ int32_t streamSetupScheduleTrigger(SStreamTask* pTask);
int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pMsg);
int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, int32_t code);
SStreamChildEpInfo* streamTaskGetUpstreamTaskEpInfo(SStreamTask* pTask, int32_t taskId);
SStreamUpstreamEpInfo* streamTaskGetUpstreamTaskEpInfo(SStreamTask* pTask, int32_t taskId);
SEpSet* streamTaskGetDownstreamEpInfo(SStreamTask* pTask, int32_t taskId);
void streamTaskInputFail(SStreamTask* pTask);
@ -672,6 +670,17 @@ int32_t streamStartScanHistoryAsync(SStreamTask* pTask, int8_t igUntreated);
int32_t streamExecScanHistoryInFuture(SStreamTask* pTask, int32_t idleDuration);
bool streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t latestVer);
// checkpoint related
int32_t streamTaskGetActiveCheckpointInfo(const SStreamTask* pTask, int32_t* pTransId, int64_t* pCheckpointId);
int32_t streamTaskSetActiveCheckpointInfo(SStreamTask* pTask, int64_t activeCheckpointId);
int32_t streamTaskSetFailedChkptInfo(SStreamTask* pTask, int32_t transId, int64_t checkpointId);
bool streamTaskAlreadySendTrigger(SStreamTask* pTask, int32_t downstreamNodeId);
void streamTaskGetTriggerRecvStatus(SStreamTask* pTask, int32_t* pRecved, int32_t* pTotal);
void streamTaskInitTriggerDispatchInfo(SStreamTask* pTask);
void streamTaskSetTriggerDispatchConfirmed(SStreamTask* pTask, int32_t vgId);
int32_t streamTaskSendCheckpointTriggerMsg(SStreamTask* pTask, int32_t checkpointType, int32_t dstTaskId, int32_t vgId,
SEpSet* pEpset);
int32_t streamQueueGetNumOfItems(const SStreamQueue* pQueue);
// common
@ -682,6 +691,7 @@ int32_t streamTaskSetUpstreamInfo(SStreamTask* pTask, const SStreamTask* pUpstre
void streamTaskSetFixedDownstreamInfo(SStreamTask* pTask, const SStreamTask* pDownstreamTask);
int32_t streamTaskReleaseState(SStreamTask* pTask);
int32_t streamTaskReloadState(SStreamTask* pTask);
void streamTaskOpenUpstreamInput(SStreamTask* pTask, int32_t taskId);
void streamTaskCloseUpstreamInput(SStreamTask* pTask, int32_t taskId);
void streamTaskOpenAllUpstreamInput(SStreamTask* pTask);
int32_t streamTaskSetDb(SStreamMeta* pMeta, SStreamTask* pTask, const char* key);

View File

@ -153,7 +153,7 @@ void startRsync() {
uDebug("[rsync] start server successful");
}
int32_t uploadRsync(const char* id, const char* path) {
int32_t uploadByRsync(const char* id, const char* path) {
int64_t st = taosGetTimestampMs();
char command[PATH_MAX] = {0};
@ -196,6 +196,7 @@ int32_t uploadRsync(const char* id, const char* path) {
return code;
}
// abort from retry if quit
int32_t downloadRsync(const char* id, const char* path) {
int64_t st = taosGetTimestampMs();
int32_t MAX_RETRY = 60;
@ -220,11 +221,10 @@ int32_t downloadRsync(const char* id, const char* path) {
uDebug("[rsync] %s start to sync data from remote to:%s, %s", id, path, command);
while(times++ < MAX_RETRY) {
code = execCommand(command);
if (code != TSDB_CODE_SUCCESS) {
uError("[rsync] %s download checkpoint data:%s failed, retry after 1sec, code:%d," ERRNO_ERR_FORMAT, id, path, code,
ERRNO_ERR_DATA);
uError("[rsync] %s download checkpoint data:%s failed, retry after 1sec, times:%d, code:%d," ERRNO_ERR_FORMAT, id,
path, times, code, ERRNO_ERR_DATA);
taosSsleep(1);
} else {
int32_t el = taosGetTimestampMs() - st;

View File

@ -89,6 +89,8 @@ SArray *smGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT_READY, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT_READY_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_TRIGGER, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_TRIGGER_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_RESET, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_HEARTBEAT_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_REQ_CHKPT_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;

View File

@ -961,6 +961,8 @@ SArray *vmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_CHECK_POINT_SOURCE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT_READY, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT_READY_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_TRIGGER, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_TRIGGER_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_UPDATE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_RESET, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_HEARTBEAT_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;

View File

@ -257,6 +257,7 @@ int tqScanWalAsync(STQ* pTq, bool ckPause);
int32_t tqStopStreamTasksAsync(STQ* pTq);
int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp);
int32_t tqProcessTaskCheckpointReadyMsg(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskRetrieveTriggerMsg(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskResetReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessStreamHbRsp(STQ* pTq, SRpcMsg* pMsg);

View File

@ -1285,7 +1285,8 @@ _checkpoint:
if (pItem && pItem->pStreamTask) {
SStreamTask *pTask = pItem->pStreamTask;
// atomic_store_32(&pTask->pMeta->chkptNotReadyTasks, 1);
pTask->chkInfo.checkpointingId = checkpointId;
streamTaskSetActiveCheckpointInfo(pTask, checkpointId);
pTask->chkInfo.checkpointId = checkpointId; // 1pTask->checkpointingId;
pTask->chkInfo.checkpointVer = pItem->submitReqVer;
pTask->info.triggerParam = pItem->fetchResultVer;

View File

@ -725,10 +725,6 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t nextProcessVer) {
}
pTask->pBackend = NULL;
// code = tqExpandStreamTask(pTask, pTq->pStreamMeta);
// if (code != TSDB_CODE_SUCCESS) {
// return code;
// }
// sink
STaskOutputInfo* pOutputInfo = &pTask->outputInfo;
@ -1092,6 +1088,10 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
int32_t len = pMsg->contLen - sizeof(SMsgHead);
int32_t code = 0;
// if (pTq->pStreamMeta->vgId == 2) {
// ASSERT(0);
// }
// disable auto rsp to mnode
pRsp->info.handle = NULL;
@ -1140,10 +1140,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
}
if (pTask->status.downstreamReady != 1) {
pTask->chkInfo.failedId = req.checkpointId; // record the latest failed checkpoint id
pTask->chkInfo.checkpointingId = req.checkpointId;
pTask->chkInfo.transId = req.transId;
streamTaskSetFailedChkptInfo(pTask, req.transId, req.checkpointId); // record the latest failed checkpoint id
tqError("s-task:%s not ready for checkpoint, since downstream not ready, ignore this checkpointId:%" PRId64
", transId:%d set it failed",
pTask->id.idStr, req.checkpointId, req.transId);
@ -1182,9 +1179,12 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
// check if the checkpoint msg already sent or not.
if (status == TASK_STATUS__CK) {
int64_t checkpointId = 0;
streamTaskGetActiveCheckpointInfo(pTask, NULL, &checkpointId);
tqWarn("s-task:%s repeatly recv checkpoint-source msg checkpointId:%" PRId64
" transId:%d already handled, ignore msg and continue process checkpoint",
pTask->id.idStr, pTask->chkInfo.checkpointingId, req.transId);
pTask->id.idStr, checkpointId, req.transId);
taosThreadMutexUnlock(&pTask->lock);
streamMetaReleaseTask(pMeta, pTask);
@ -1244,6 +1244,10 @@ int32_t tqProcessTaskResetReq(STQ* pTq, SRpcMsg* pMsg) {
return tqStreamTaskProcessTaskResetReq(pTq->pStreamMeta, pMsg);
}
int32_t tqProcessTaskRetrieveTriggerMsg(STQ* pTq, SRpcMsg* pMsg) {
return tqStreamTaskProcessRetrieveTriggerReq(pTq->pStreamMeta, pMsg);
}
// this function is needed, do not try to remove it.
int32_t tqProcessStreamHbRsp(STQ* pTq, SRpcMsg* pMsg) {
return tqStreamProcessStreamHbRsp(pTq->pStreamMeta, pMsg);

View File

@ -699,9 +699,7 @@ static int32_t restartStreamTasks(SStreamMeta* pMeta, bool isLeader) {
}
if (isLeader && !tsDisableStream) {
streamMetaResetTaskStatus(pMeta);
streamMetaWUnLock(pMeta);
streamMetaStartAllTasks(pMeta, tqExpandStreamTask);
} else {
streamMetaResetStartInfo(&pMeta->startInfo);
@ -839,13 +837,19 @@ int32_t tqStreamTaskProcessTaskResetReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
// clear flag set during do checkpoint, and open inputQ for all upstream tasks
SStreamTaskState *pState = streamTaskGetStatus(pTask);
if (pState->state == TASK_STATUS__CK) {
int32_t tranId = 0;
int64_t activeChkId = 0;
streamTaskGetActiveCheckpointInfo(pTask, &tranId, &activeChkId);
tqDebug("s-task:%s reset task status from checkpoint, current checkpointingId:%" PRId64 ", transId:%d",
pTask->id.idStr, pTask->chkInfo.checkpointingId, pTask->chkInfo.transId);
pTask->id.idStr, activeChkId, tranId);
streamTaskSetStatusReady(pTask);
} else if (pState->state == TASK_STATUS__UNINIT) {
tqDebug("s-task:%s start task by checking downstream tasks", pTask->id.idStr);
ASSERT(pTask->status.downstreamReady == 0);
/*int32_t ret = */ streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_INIT);
// /*int32_t ret = */ streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_INIT);
tqStreamStartOneTaskAsync(pMeta, pTask->pMsgCb, pTask->id.streamId, pTask->id.taskId);
} else {
tqDebug("s-task:%s status:%s do nothing after receiving reset-task from mnode", pTask->id.idStr, pState->name);
}
@ -856,6 +860,57 @@ int32_t tqStreamTaskProcessTaskResetReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
return TSDB_CODE_SUCCESS;
}
int32_t tqStreamTaskProcessRetrieveTriggerReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
SRetrieveChkptTriggerReq* pReq = (SRetrieveChkptTriggerReq*) pMsg;
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->upstreamTaskId);
if (pTask == NULL) {
tqError("vgId:%d process retrieve checkpoint trigger, checkpointId:%" PRId64
" from s-task:0x%x, failed to acquire task:0x%x, it may have been dropped already",
pMeta->vgId, pReq->checkpointId, (int32_t)pReq->downstreamTaskId, pReq->upstreamTaskId);
return TSDB_CODE_SUCCESS;
}
tqDebug("s-task:0x%x recv retrieve checkpoint-trigger msg from downstream s-task:0x%x, checkpointId:%" PRId64,
pReq->upstreamTaskId, (int32_t)pReq->downstreamTaskId, pReq->checkpointId);
SStreamTaskState* pState = streamTaskGetStatus(pTask);
if (pState->state == TASK_STATUS__CK) { // recv the checkpoint-source/trigger already
int32_t transId = 0;
int64_t checkpointId = 0;
streamTaskGetActiveCheckpointInfo(pTask, &transId, &checkpointId);
ASSERT (checkpointId == pReq->checkpointId);
if (streamTaskAlreadySendTrigger(pTask, pReq->downstreamNodeId)) {
// re-send the lost checkpoint-trigger msg to downstream task
SEpSet* pEpset = streamTaskGetDownstreamEpInfo(pTask, pReq->downstreamTaskId);
streamTaskSendCheckpointTriggerMsg(pTask, STREAM_INPUT__CHECKPOINT_TRIGGER, pReq->downstreamTaskId,
pReq->downstreamNodeId, pEpset);
} else { // not send checkpoint-trigger yet, wait
int32_t recv = 0, total = 0;
streamTaskGetTriggerRecvStatus(pTask, &recv, &total);
if (recv == total) { // add the ts info
tqWarn("s-task:%s all upstream send checkpoint-source/trigger, but not processed yet, wait", pTask->id.idStr);
} else {
tqWarn(
"s-task:%s not all upstream send checkpoint-source/trigger, total recv:%d/%d, wait for all upstream "
"sending checkpoint-source/trigger",
pTask->id.idStr, recv, total);
}
}
} else { // upstream not recv the checkpoint-source/trigger till now
ASSERT(pState->state == TASK_STATUS__READY || pState->state == TASK_STATUS__HALT);
tqWarn(
"s-task:%s not recv checkpoint-source from mnode or checkpoint-trigger from upstream yet, wait for all "
"upstream sending checkpoint-source/trigger",
pTask->id.idStr);
}
return TSDB_CODE_SUCCESS;
}
int32_t tqStreamTaskProcessTaskPauseReq(SStreamMeta* pMeta, char* pMsg) {
SVPauseStreamTaskReq* pReq = (SVPauseStreamTaskReq*)pMsg;

View File

@ -422,7 +422,7 @@ int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData)
}
// STREAM ============
vInfo("vgId:%d stream task start", vgId);
vInfo("vgId:%d stream task start to take snapshot", vgId);
if (!pReader->streamTaskDone) {
if (pReader->pStreamTaskReader == NULL) {
code = streamTaskSnapReaderOpen(pReader->pVnode->pTq, pReader->sver, pReader->sver, &pReader->pStreamTaskReader);

View File

@ -842,12 +842,16 @@ int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo)
return tqProcessTaskScanHistory(pVnode->pTq, pMsg);
case TDMT_STREAM_TASK_CHECKPOINT_READY:
return tqProcessTaskCheckpointReadyMsg(pVnode->pTq, pMsg);
case TDMT_STREAM_RETRIEVE_TRIGGER:
return tqProcessTaskRetrieveTriggerMsg(pVnode->pTq, pMsg);
case TDMT_MND_STREAM_HEARTBEAT_RSP:
return tqProcessStreamHbRsp(pVnode->pTq, pMsg);
case TDMT_MND_STREAM_REQ_CHKPT_RSP:
return tqProcessStreamReqCheckpointRsp(pVnode->pTq, pMsg);
case TDMT_STREAM_TASK_CHECKPOINT_READY_RSP:
return tqProcessTaskCheckpointReadyRsp(pVnode->pTq, pMsg);
case TDMT_STREAM_RETRIEVE_TRIGGER_RSP:
return tqProcessTaskCheckpointReadyMsg(pVnode->pTq, pMsg);
case TDMT_VND_GET_STREAM_PROGRESS:
return tqStreamProgressRetrieveReq(pVnode->pTq, pMsg);
default:

View File

@ -52,6 +52,18 @@ extern "C" {
#define stTrace(...) do { if (stDebugFlag & DEBUG_TRACE) { taosPrintLog("STM ", DEBUG_TRACE, stDebugFlag, __VA_ARGS__); }} while(0)
// clang-format on
struct SActiveCheckpointInfo {
TdThreadMutex lock;
int32_t transId;
int64_t firstRecvTs; // first time to recv checkpoint trigger info
int64_t activeId; // current active checkpoint id
int64_t failedId;
bool dispatchTrigger;
SArray* pDispatchTriggerList; // SArray<STaskTriggerSendInfo>
SArray* pReadyMsgList; // SArray<SStreamChkptReadyInfo*>
tmr_h pCheckTmr;
};
typedef struct {
int8_t type;
SSDataBlock* pBlock;
@ -81,6 +93,24 @@ struct STokenBucket {
int64_t quotaFillTimestamp; // fill timestamp
};
typedef struct {
int32_t upStreamTaskId;
SEpSet upstreamNodeEpset;
int32_t nodeId;
SRpcMsg msg;
int64_t recvTs;
int32_t transId;
int64_t checkpointId;
} SStreamChkptReadyInfo;
typedef struct {
int64_t sendTs;
int64_t recvTs;
bool recved;
int32_t nodeId;
int32_t taskId;
} STaskTriggerSendInfo;
struct SStreamQueue {
STaosQueue* pQueue;
STaosQall* qall;
@ -113,6 +143,9 @@ void destroyDispatchMsg(SStreamDispatchReq* pReq, int32_t numOfVgroups);
int32_t getNumOfDispatchBranch(SStreamTask* pTask);
void clearBufferedDispatchMsg(SStreamTask* pTask);
int32_t streamTaskBuildAndSendTriggerMsg(SStreamTask* pTask, const SStreamDataBlock* pData, int32_t dstTaskId,
int32_t vgId, SEpSet* pEpset);
int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock* pBlock);
SStreamDataBlock* createStreamBlockFromDispatchMsg(const SStreamDispatchReq* pReq, int32_t blockType, int32_t srcVg);
SStreamDataBlock* createStreamBlockFromResults(SStreamQueueItem* pItem, SStreamTask* pTask, int64_t resultSize,
@ -131,11 +164,13 @@ int32_t streamTaskSendCheckpointReq(SStreamTask* pTask);
void streamTaskSetFailedCheckpointId(SStreamTask* pTask);
int32_t streamTaskGetNumOfDownstream(const SStreamTask* pTask);
int32_t streamTaskGetNumOfUpstream(const SStreamTask* pTask);
int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t numCap, int32_t numRate, float quotaRate, const char*);
STaskId streamTaskGetTaskId(const SStreamTask* pTask);
void streamTaskInitForLaunchHTask(SHistoryTaskInfo* pInfo);
void streamTaskSetRetryInfoForLaunch(SHistoryTaskInfo* pInfo);
int32_t streamTaskResetTimewindowFilter(SStreamTask* pTask);
void streamTaskClearActiveInfo(SActiveCheckpointInfo* pInfo);
void streamClearChkptReadyMsg(SStreamTask* pTask);
EExtractDataCode streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks,

View File

@ -386,12 +386,12 @@ int32_t rebuildFromRemoteChkp_s3(const char* key, char* chkpPath, int64_t chkpId
return code;
}
int32_t rebuildFromRemoteCheckpoint(const char* key, char* chkpPath, int64_t checkpointId, char* defaultPath) {
int32_t rebuildFromRemoteCheckpoint(const char* key, char* chkptPath, int64_t checkpointId, char* defaultPath) {
ECHECKPOINT_BACKUP_TYPE type = streamGetCheckpointBackupType();
if (type == DATA_UPLOAD_S3) {
return rebuildFromRemoteChkp_s3(key, chkpPath, checkpointId, defaultPath);
return rebuildFromRemoteChkp_s3(key, chkptPath, checkpointId, defaultPath);
} else if (type == DATA_UPLOAD_RSYNC) {
return rebuildFromRemoteChkp_rsync(key, chkpPath, checkpointId, defaultPath);
return rebuildFromRemoteChkp_rsync(key, chkptPath, checkpointId, defaultPath);
} else {
stError("%s no remote backup checkpoint data for:%" PRId64, key, checkpointId);
}
@ -538,7 +538,9 @@ int32_t restoreCheckpointData(const char* path, const char* key, int64_t chkptId
taosMulMkDir(defaultPath);
}
char* checkpointRoot = taosMemoryCalloc(1, strlen(path) + 256);
int32_t pathLen = strlen(path) + 256;
char* checkpointRoot = taosMemoryCalloc(1, pathLen);
sprintf(checkpointRoot, "%s%s%s", prefixPath, TD_DIRSEP, "checkpoints");
if (!taosIsDir(checkpointRoot)) {
@ -548,9 +550,9 @@ int32_t restoreCheckpointData(const char* path, const char* key, int64_t chkptId
stDebug("%s check local backend dir:%s, checkpointId:%" PRId64 " succ", key, defaultPath, chkptId);
char* chkptPath = taosMemoryCalloc(1, strlen(path) + 256);
char* chkptPath = taosMemoryCalloc(1, pathLen);
if (chkptId > 0) {
sprintf(chkptPath, "%s%s%s%s%s%" PRId64 "", prefixPath, TD_DIRSEP, "checkpoints", TD_DIRSEP, "checkpoint", chkptId);
snprintf(chkptPath, pathLen, "%s%s%s%s%s%" PRId64 "", prefixPath, TD_DIRSEP, "checkpoints", TD_DIRSEP, "checkpoint", chkptId);
code = rebuildFromLocalCheckpoint(key, chkptPath, chkptId, defaultPath);
if (code != 0) {

View File

@ -40,7 +40,7 @@ static SDownstreamStatusInfo* findCheckRspStatus(STaskCheckInfo* pInfo, int32_t
int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_t vgId, int64_t stage,
int64_t* oldStage) {
SStreamChildEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, upstreamTaskId);
SStreamUpstreamEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, upstreamTaskId);
ASSERT(pInfo != NULL);
*oldStage = pInfo->stage;

View File

@ -20,9 +20,9 @@
typedef struct {
ECHECKPOINT_BACKUP_TYPE type;
char* taskId;
int64_t chkpId;
char* taskId;
int64_t chkpId;
SStreamTask* pTask;
int64_t dbRefId;
void* pMeta;
@ -35,22 +35,33 @@ static int32_t deleteCheckpoint(const char* id);
static int32_t downloadCheckpointByNameS3(const char* id, const char* fname, const char* dstName);
static int32_t continueDispatchCheckpointTriggerBlock(SStreamDataBlock* pBlock, SStreamTask* pTask);
static int32_t appendCheckpointIntoInputQ(SStreamTask* pTask, int32_t checkpointType);
static int32_t streamAlignCheckpoint(SStreamTask* pTask);
static int32_t doSendRetrieveTriggerMsg(SStreamTask* pTask, SArray* pNotSendList);
static SStreamDataBlock* createChkptTriggerBlock(SStreamTask* pTask, int32_t checkpointType);
static void checkpointTriggerMonitorFn(void* param, void* tmrId);
int32_t streamAlignCheckpoint(SStreamTask* pTask) {
int32_t num = taosArrayGetSize(pTask->upstreamInfo.pList);
int64_t old = atomic_val_compare_exchange_32(&pTask->chkInfo.downstreamAlignNum, 0, num);
if (old == 0) {
stDebug("s-task:%s set initial align upstream num:%d", pTask->id.idStr, num);
bool streamTaskIsAllUpstreamSendTrigger(SStreamTask* pTask) {
SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo;
int32_t numOfUpstreams = taosArrayGetSize(pTask->upstreamInfo.pList);
bool allSend = true;
taosThreadMutexLock(&pActiveInfo->lock);
int32_t numOfRecv = taosArrayGetSize(pActiveInfo->pReadyMsgList);
if (numOfRecv < numOfUpstreams) {
stDebug("s-task:%s received checkpoint-trigger block, idx:%d, %d upstream tasks not send yet, total:%d",
pTask->id.idStr, pTask->info.selfChildId, (numOfUpstreams - numOfRecv), numOfUpstreams);
allSend = false;
}
return atomic_sub_fetch_32(&pTask->chkInfo.downstreamAlignNum, 1);
taosThreadMutexUnlock(&pActiveInfo->lock);
return allSend;
}
int32_t appendCheckpointIntoInputQ(SStreamTask* pTask, int32_t checkpointType) {
SStreamDataBlock* createChkptTriggerBlock(SStreamTask* pTask, int32_t checkpointType) {
SStreamDataBlock* pChkpoint = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, sizeof(SSDataBlock));
if (pChkpoint == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
pChkpoint->type = checkpointType;
@ -58,12 +69,13 @@ int32_t appendCheckpointIntoInputQ(SStreamTask* pTask, int32_t checkpointType) {
SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
if (pBlock == NULL) {
taosFreeQitem(pChkpoint);
return TSDB_CODE_OUT_OF_MEMORY;
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
pBlock->info.type = STREAM_CHECKPOINT;
pBlock->info.version = pTask->chkInfo.checkpointingId;
pBlock->info.window.ekey = pBlock->info.window.skey = pTask->chkInfo.transId; // NOTE: set the transId
pBlock->info.version = pTask->chkInfo.pActiveInfo->activeId;
pBlock->info.window.ekey = pBlock->info.window.skey = pTask->chkInfo.pActiveInfo->transId; // NOTE: set the transId
pBlock->info.rows = 1;
pBlock->info.childId = pTask->info.selfChildId;
@ -71,6 +83,14 @@ int32_t appendCheckpointIntoInputQ(SStreamTask* pTask, int32_t checkpointType) {
taosArrayPush(pChkpoint->blocks, pBlock);
taosMemoryFree(pBlock);
terrno = 0;
return pChkpoint;
}
int32_t appendCheckpointIntoInputQ(SStreamTask* pTask, int32_t checkpointType) {
SStreamDataBlock* pChkpoint = createChkptTriggerBlock(pTask, checkpointType);
if (streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pChkpoint) < 0) {
return TSDB_CODE_OUT_OF_MEMORY;
}
@ -87,23 +107,43 @@ int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSo
int32_t code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_GEN_CHECKPOINT);
ASSERT(code == TSDB_CODE_SUCCESS);
pTask->chkInfo.transId = pReq->transId;
pTask->chkInfo.checkpointingId = pReq->checkpointId;
pTask->chkInfo.pActiveInfo->transId = pReq->transId;
pTask->chkInfo.pActiveInfo->activeId = pReq->checkpointId;
pTask->chkInfo.numOfNotReady = streamTaskGetNumOfDownstream(pTask);
pTask->chkInfo.startTs = taosGetTimestampMs();
pTask->execInfo.checkpoint += 1;
// 2. Put the checkpoint block into inputQ, to make sure all blocks with less version have been handled by this task
// and this is the last item in the inputQ.
return appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT_TRIGGER);
}
int32_t streamTaskSendCheckpointTriggerMsg(SStreamTask* pTask, int32_t checkpointType, int32_t dstTaskId, int32_t vgId,
SEpSet* pEpset) {
SStreamDataBlock* pChkpoint = createChkptTriggerBlock(pTask, checkpointType);
pChkpoint->srcTaskId = pTask->id.taskId;
pChkpoint->srcVgId = pTask->pMeta->vgId;
int32_t code = streamTaskBuildAndSendTriggerMsg(pTask, pChkpoint, dstTaskId, vgId, pEpset);
if (code == TSDB_CODE_SUCCESS) {
stDebug("s-task:%s build and send checkpoint-trigger dispatch msg succ, stage:%" PRId64, pTask->id.idStr,
pTask->pMeta->stage);
} else {
// todo handle send data failure
stError("s-task:%s failed to build and send trigger msg", pTask->id.idStr);
}
return code;
}
int32_t continueDispatchCheckpointTriggerBlock(SStreamDataBlock* pBlock, SStreamTask* pTask) {
pBlock->srcTaskId = pTask->id.taskId;
pBlock->srcVgId = pTask->pMeta->vgId;
int32_t code = taosWriteQitem(pTask->outputq.queue->pQueue, pBlock);
if (code == 0) {
ASSERT(pTask->chkInfo.dispatchCheckpointTrigger == false);
ASSERT(pTask->chkInfo.pActiveInfo->dispatchTrigger == false);
streamDispatchStreamBlock(pTask);
} else {
stError("s-task:%s failed to put checkpoint into outputQ, code:%s", pTask->id.idStr, tstrerror(code));
@ -127,8 +167,8 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock
// set task status
if (streamTaskGetStatus(pTask)->state != TASK_STATUS__CK) {
pTask->chkInfo.checkpointingId = checkpointId;
pTask->chkInfo.transId = transId;
pTask->chkInfo.pActiveInfo->activeId = checkpointId;
pTask->chkInfo.pActiveInfo->transId = transId;
code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_GEN_CHECKPOINT);
if (code != TSDB_CODE_SUCCESS) {
@ -136,12 +176,20 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock
streamFreeQitem((SStreamQueueItem*)pBlock);
return code;
}
SActiveCheckpointInfo* pActive = pTask->chkInfo.pActiveInfo;
if (pActive->pCheckTmr == NULL) {
pActive->pCheckTmr = taosTmrStart(checkpointTriggerMonitorFn, 10000, pTask, streamTimer);
} else {
taosTmrReset(checkpointTriggerMonitorFn, 10000, pTask, streamTimer, &pActive->pCheckTmr);
}
}
// todo fix race condition: set the status and append checkpoint block
int32_t taskLevel = pTask->info.taskLevel;
if (taskLevel == TASK_LEVEL__SOURCE) {
int8_t type = pTask->outputInfo.type;
if (type == TASK_OUTPUT__FIXED_DISPATCH || type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
stDebug("s-task:%s set childIdx:%d, and add checkpoint-trigger block into outputQ", id, pTask->info.selfChildId);
continueDispatchCheckpointTriggerBlock(pBlock, pTask);
@ -161,23 +209,19 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock
streamAddCheckpointReadyMsg(pTask, pBlock->srcTaskId, pTask->info.selfChildId, checkpointId);
// there are still some upstream tasks not send checkpoint request, do nothing and wait for then
int32_t notReady = streamAlignCheckpoint(pTask);
int32_t num = taosArrayGetSize(pTask->upstreamInfo.pList);
if (notReady > 0) {
stDebug("s-task:%s received checkpoint block, idx:%d, %d upstream tasks not send checkpoint info yet, total:%d",
id, pTask->info.selfChildId, notReady, num);
bool allSend = streamTaskIsAllUpstreamSendTrigger(pTask);
if (!allSend) {
streamFreeQitem((SStreamQueueItem*)pBlock);
return code;
}
int32_t num = streamTaskGetNumOfUpstream(pTask);
if (taskLevel == TASK_LEVEL__SINK) {
stDebug("s-task:%s process checkpoint block, all %d upstreams sent checkpoint msgs, send ready msg to upstream",
id, num);
stDebug("s-task:%s process checkpoint-trigger block, all %d upstreams sent, send ready msg to upstream", id, num);
streamFreeQitem((SStreamQueueItem*)pBlock);
streamTaskBuildCheckpoint(pTask);
} else { // source & agg tasks need to forward the checkpoint msg downwards
stDebug("s-task:%s process checkpoint block, all %d upstreams sent checkpoint msgs, continue forwards msg", id,
num);
stDebug("s-task:%s process checkpoint-trigger block, all %d upstreams sent, forwards to downstream", id, num);
// set the needed checked downstream tasks, only when all downstream tasks do checkpoint complete, this task
// can start local checkpoint procedure
@ -216,14 +260,10 @@ int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask) {
}
void streamTaskClearCheckInfo(SStreamTask* pTask, bool clearChkpReadyMsg) {
pTask->chkInfo.checkpointingId = 0; // clear the checkpoint id
pTask->chkInfo.failedId = 0;
pTask->chkInfo.startTs = 0; // clear the recorded start time
pTask->chkInfo.numOfNotReady = 0;
pTask->chkInfo.transId = 0;
pTask->chkInfo.dispatchCheckpointTrigger = false;
pTask->chkInfo.downstreamAlignNum = 0;
streamTaskClearActiveInfo(pTask->chkInfo.pActiveInfo);
streamTaskOpenAllUpstreamInput(pTask); // open inputQ for all upstream tasks
if (clearChkpReadyMsg) {
streamClearChkptReadyMsg(pTask);
@ -317,9 +357,9 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, SVUpdateCheckpoin
}
void streamTaskSetFailedCheckpointId(SStreamTask* pTask) {
pTask->chkInfo.failedId = pTask->chkInfo.checkpointingId;
pTask->chkInfo.pActiveInfo->failedId = pTask->chkInfo.pActiveInfo->activeId;
stDebug("s-task:%s mark the checkpointId:%" PRId64 " (transId:%d) failed", pTask->id.idStr,
pTask->chkInfo.checkpointingId, pTask->chkInfo.transId);
pTask->chkInfo.pActiveInfo->activeId, pTask->chkInfo.pActiveInfo->transId);
}
static int32_t getCheckpointDataMeta(const char* id, const char* path, SArray* list) {
@ -363,7 +403,7 @@ int32_t uploadCheckpointData(void* param) {
SAsyncUploadArg* pParam = param;
char* path = NULL;
int32_t code = 0;
SArray* toDelFiles = taosArrayInit(4, sizeof(void*));
SArray* toDelFiles = taosArrayInit(4, POINTER_BYTES);
char* taskStr = pParam->taskId ? pParam->taskId : "NULL";
void* pBackend = taskAcquireDb(pParam->dbRefId);
@ -387,10 +427,10 @@ int32_t uploadCheckpointData(void* param) {
if (code == TSDB_CODE_SUCCESS) {
code = streamTaskUploadCheckpoint(pParam->taskId, path);
if (code != TSDB_CODE_SUCCESS) {
stError("s-task:%s failed to upload checkpoint data:%s, checkpointId:%" PRId64, taskStr, path, pParam->chkpId);
if (code == TSDB_CODE_SUCCESS) {
stDebug("s-task:%s upload checkpointId:%" PRId64 " to remote succ", taskStr, pParam->chkpId);
} else {
stDebug("s-task:%s backup checkpointId:%"PRId64" to remote succ", taskStr, pParam->chkpId);
stError("s-task:%s failed to upload checkpointId:%" PRId64 " data:%s", taskStr, pParam->chkpId, path);
}
}
@ -403,19 +443,25 @@ int32_t uploadCheckpointData(void* param) {
for (int i = 0; i < size; i++) {
char* pName = taosArrayGetP(toDelFiles, i);
code = deleteCheckpointFile(pParam->taskId, pName);
stDebug("s-task:%s try to del file: %s", taskStr, pName);
if (code != 0) {
stDebug("s-task:%s failed to del file: %s", taskStr, pName);
break;
}
}
stDebug("s-task:%s remove redundant files done", taskStr);
}
taosArrayDestroyP(toDelFiles, taosMemoryFree);
stDebug("s-task:%s remove local checkpoint dir:%s", taskStr, path);
taosRemoveDir(path);
taosMemoryFree(path);
if (code == TSDB_CODE_SUCCESS) {
stDebug("s-task:%s remove local checkpointId:%" PRId64 " data %s", taskStr, pParam->chkpId, path);
taosRemoveDir(path);
} else {
stDebug("s-task:%s update checkpointId:%" PRId64 " keep local checkpoint data", taskStr, pParam->chkpId);
}
taosMemoryFree(path);
taosMemoryFree(pParam->taskId);
taosMemoryFree(pParam);
@ -446,7 +492,7 @@ int32_t streamTaskRemoteBackupCheckpoint(SStreamTask* pTask, int64_t checkpointI
int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) {
int32_t code = TSDB_CODE_SUCCESS;
int64_t startTs = pTask->chkInfo.startTs;
int64_t ckId = pTask->chkInfo.checkpointingId;
int64_t ckId = pTask->chkInfo.pActiveInfo->activeId;
const char* id = pTask->id.idStr;
bool dropRelHTask = (streamTaskGetPrevStatus(pTask) == TASK_STATUS__HALT);
SStreamMeta* pMeta = pTask->pMeta;
@ -510,6 +556,211 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) {
return code;
}
void checkpointTriggerMonitorFn(void* param, void* tmrId) {
SStreamTask* pTask = param;
SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo;
int32_t vgId = pTask->pMeta->vgId;
int64_t now = taosGetTimestampMs();
stDebug("s-task:%s vgId:%d checkpoint-trigger monit start, ts:%" PRId64, pTask->id.idStr, vgId, now);
taosThreadMutexLock(&pTask->lock);
SStreamTaskState* pState = streamTaskGetStatus(pTask);
if (pState->state == TASK_STATUS__CK) {
stDebug("s-task:%s vgId:%d not in checkpoint status, quit from monitor checkpoint-trigger", pTask->id.idStr, vgId);
taosThreadMutexUnlock(&pTask->lock);
return;
}
taosThreadMutexUnlock(&pTask->lock);
taosThreadMutexLock(&pActiveInfo->lock);
// send msg to retrieve checkpoint trigger msg
SArray* pList = pTask->upstreamInfo.pList;
ASSERT(pTask->info.taskLevel > TASK_LEVEL__SOURCE);
SArray* pNotSendList = taosArrayInit(4, sizeof(SStreamUpstreamEpInfo));
for(int32_t i = 0; i < taosArrayGetSize(pList); ++i) {
SStreamUpstreamEpInfo* pInfo = taosArrayGetP(pList, i);
bool recved = false;
for(int32_t j = 0; j < taosArrayGetSize(pActiveInfo->pReadyMsgList); ++j) {
SStreamChkptReadyInfo* pReady = taosArrayGet(pActiveInfo->pReadyMsgList, j);
if (pInfo->nodeId == pReady->nodeId) {
recved = true;
break;
}
}
if (!recved) { // make sure the inputQ is opened for not recv upstream checkpoint-trigger message
streamTaskOpenUpstreamInput(pTask, pInfo->taskId);
taosArrayPush(pNotSendList, pInfo);
}
}
// do send retrieve checkpoint trigger msg to upstream
doSendRetrieveTriggerMsg(pTask, pNotSendList);
taosThreadMutexUnlock(&pActiveInfo->lock);
// check every 100ms
if (taosArrayGetSize(pNotSendList) > 0) {
taosTmrReset(checkpointTriggerMonitorFn, 10000, pTask, streamTimer, &pActiveInfo->pCheckTmr);
}
taosArrayDestroy(pNotSendList);
}
int32_t doSendRetrieveTriggerMsg(SStreamTask* pTask, SArray* pNotSendList) {
int32_t code = 0;
int32_t vgId = pTask->pMeta->vgId;
const char* pId = pTask->id.idStr;
for (int32_t i = 0; i < taosArrayGetSize(pNotSendList); i++) {
SStreamUpstreamEpInfo* pUpstreamTask = taosArrayGet(pNotSendList, i);
SRetrieveChkptTriggerReq* pReq = rpcMallocCont(sizeof(SRetrieveChkptTriggerReq));
if (pReq == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
stError("vgId:%d failed to create msg to retrieve trigger msg for task:%s exec, code:out of memory", vgId, pId);
continue;
}
pReq->head.vgId = htonl(pUpstreamTask->nodeId);
pReq->streamId = pTask->id.streamId;
pReq->downstreamTaskId = pTask->id.taskId;
pReq->downstreamNodeId = vgId;
pReq->upstreamTaskId = pUpstreamTask->taskId;
pReq->upstreamNodeId = pUpstreamTask->nodeId;
pReq->checkpointId = pTask->chkInfo.pActiveInfo->activeId;
SRpcMsg rpcMsg = {0};
initRpcMsg(&rpcMsg, TDMT_STREAM_RETRIEVE, pReq, sizeof(SRetrieveChkptTriggerReq));
code = tmsgSendReq(&pUpstreamTask->epSet, &rpcMsg);
}
return TSDB_CODE_SUCCESS;
}
bool streamTaskAlreadySendTrigger(SStreamTask* pTask, int32_t downstreamNodeId) {
SStreamTaskState* pStatus = streamTaskGetStatus(pTask);
if (pStatus->state != TASK_STATUS__CK) {
return false;
}
SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo;
taosThreadMutexLock(&pInfo->lock);
if (!pInfo->dispatchTrigger) {
taosThreadMutexUnlock(&pInfo->lock);
return false;
}
for(int32_t i = 0; i < taosArrayGetSize(pInfo->pDispatchTriggerList); ++i) {
STaskTriggerSendInfo* pSendInfo = taosArrayGet(pInfo->pDispatchTriggerList, i);
if (pSendInfo->nodeId != downstreamNodeId) {
continue;
}
// has send trigger msg to downstream node,
if (pSendInfo->recved) {
stWarn("s-task:%s checkpoint-trigger msg send at:%"PRId64" and recv confirmed, checkpointId:%"PRId64 ", transId:%d",
pTask->id.idStr, pSendInfo->sendTs, pInfo->activeId, pInfo->transId);
} else {
stWarn("s-task:%s checkpoint-trigger send at:%"PRId64", checkpointId:%"PRId64", transId:%d", pTask->id.idStr,
pSendInfo->sendTs, pInfo->activeId, pInfo->transId);
}
taosThreadMutexUnlock(&pInfo->lock);
return true;
}
ASSERT(0);
return false;
}
void streamTaskGetTriggerRecvStatus(SStreamTask* pTask, int32_t* pRecved, int32_t* pTotal) {
*pRecved = taosArrayGetSize(pTask->chkInfo.pActiveInfo->pReadyMsgList);
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
*pTotal = 1;
} else {
*pTotal = streamTaskGetNumOfUpstream(pTask);
}
}
// record the dispatch checkpoint trigger info in the list
void streamTaskInitTriggerDispatchInfo(SStreamTask* pTask) {
SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo;
int64_t now = taosGetTimestampMs();
taosThreadMutexLock(&pInfo->lock);
// outputQ should be empty here
ASSERT(streamQueueGetNumOfItems(pTask->outputq.queue) == 1);
pInfo->dispatchTrigger = true;
if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
STaskDispatcherFixed* pDispatch = &pTask->outputInfo.fixedDispatcher;
STaskTriggerSendInfo p = {.sendTs = now, .recved = false, .nodeId = pDispatch->nodeId, .taskId = pDispatch->taskId};
taosArrayPush(pInfo->pDispatchTriggerList, &p);
} else {
for (int32_t i = 0; i < streamTaskGetNumOfDownstream(pTask); ++i) {
SVgroupInfo* pVgInfo = taosArrayGet(pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos, i);
STaskTriggerSendInfo p = {.sendTs = now, .recved = false, .nodeId = pVgInfo->vgId, .taskId = pVgInfo->taskId};
taosArrayPush(pInfo->pDispatchTriggerList, &p);
}
}
taosThreadMutexUnlock(&pInfo->lock);
}
int32_t streamTaskGetNumOfConfirmed(SStreamTask* pTask) {
SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo;
int32_t num = 0;
taosThreadMutexLock(&pInfo->lock);
for(int32_t i = 0; i < taosArrayGetSize(pInfo->pDispatchTriggerList); ++i) {
STaskTriggerSendInfo* p = taosArrayGet(pInfo->pDispatchTriggerList, i);
if (p->recved) {
num ++;
}
}
taosThreadMutexUnlock(&pInfo->lock);
return num;
}
void streamTaskSetTriggerDispatchConfirmed(SStreamTask* pTask, int32_t vgId) {
SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo;
int32_t taskId = 0;
taosThreadMutexLock(&pInfo->lock);
for (int32_t i = 0; i < taosArrayGetSize(pInfo->pDispatchTriggerList); ++i) {
STaskTriggerSendInfo* p = taosArrayGet(pInfo->pDispatchTriggerList, i);
if (p->nodeId == vgId) {
ASSERT(p->recved == false);
p->recved = true;
p->recvTs = taosGetTimestampMs();
taskId = p->taskId;
break;
}
}
taosThreadMutexUnlock(&pInfo->lock);
int32_t numOfConfirmed = streamTaskGetNumOfConfirmed(pTask);
int32_t total = streamTaskGetNumOfDownstream(pTask);
stDebug("s-task:%s set downstream:0x%x(vgId:%d) checkpoint-trigger dispatch confirmed, total confirmed:%d/%d",
pTask->id.idStr, taskId, vgId, numOfConfirmed, total);
ASSERT(taskId != 0);
}
static int32_t uploadCheckpointToS3(const char* id, const char* path) {
TdDirPtr pDir = taosOpenDir(path);
if (pDir == NULL) return -1;
@ -576,7 +827,7 @@ int32_t streamTaskUploadCheckpoint(const char* id, const char* path) {
}
if (strlen(tsSnodeAddress) != 0) {
return uploadRsync(id, path);
return uploadByRsync(id, path);
} else if (tsS3StreamEnabled) {
return uploadCheckpointToS3(id, path);
}

View File

@ -23,12 +23,6 @@ typedef struct SBlockName {
char parTbName[TSDB_TABLE_NAME_LEN];
} SBlockName;
typedef struct {
int32_t upStreamTaskId;
SEpSet upstreamNodeEpset;
SRpcMsg msg;
} SStreamChkptReadyInfo;
static void doRetryDispatchData(void* param, void* tmrId);
static int32_t doSendDispatchMsg(SStreamTask* pTask, const SStreamDispatchReq* pReq, int32_t vgId, SEpSet* pEpSet);
static int32_t streamAddBlockIntoDispatchMsg(const SSDataBlock* pBlock, SStreamDispatchReq* pReq);
@ -85,12 +79,14 @@ int32_t streamTaskBroadcastRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* r
void* buf = NULL;
int32_t sz = taosArrayGetSize(pTask->upstreamInfo.pList);
ASSERT(sz > 0);
for (int32_t i = 0; i < sz; i++) {
req->reqId = tGenIdPI64();
SStreamChildEpInfo* pEpInfo = taosArrayGetP(pTask->upstreamInfo.pList, i);
SStreamUpstreamEpInfo* pEpInfo = taosArrayGetP(pTask->upstreamInfo.pList, i);
req->dstNodeId = pEpInfo->nodeId;
req->dstTaskId = pEpInfo->taskId;
int32_t len;
tEncodeSize(tEncodeStreamRetrieveReq, req, len, code);
if (code != 0) {
ASSERT(0);
@ -115,7 +111,6 @@ int32_t streamTaskBroadcastRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* r
code = tmsgSendReq(&pEpInfo->epSet, &rpcMsg);
if (code != 0) {
ASSERT(0);
rpcFreeCont(buf);
return code;
}
@ -124,15 +119,16 @@ int32_t streamTaskBroadcastRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* r
stDebug("s-task:%s (child %d) send retrieve req to task:0x%x (vgId:%d), reqId:0x%" PRIx64, pTask->id.idStr,
pTask->info.selfChildId, pEpInfo->taskId, pEpInfo->nodeId, req->reqId);
}
return code;
}
static int32_t buildStreamRetrieveReq(SStreamTask* pTask, const SSDataBlock* pBlock, SStreamRetrieveReq* req){
SRetrieveTableRsp* pRetrieve = NULL;
int32_t dataStrLen = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock);
pRetrieve = taosMemoryCalloc(1, dataStrLen);
if (pRetrieve == NULL) return TSDB_CODE_OUT_OF_MEMORY;
SRetrieveTableRsp* pRetrieve = taosMemoryCalloc(1, dataStrLen);
if (pRetrieve == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
pRetrieve->useconds = 0;
@ -231,6 +227,28 @@ void clearBufferedDispatchMsg(SStreamTask* pTask) {
pMsgInfo->dispatchMsgType = 0;
}
int32_t streamTaskBuildAndSendTriggerMsg(SStreamTask* pTask, const SStreamDataBlock* pData, int32_t dstTaskId,
int32_t vgId, SEpSet* pEpset) {
SStreamDispatchReq* pReq = taosMemoryCalloc(1, sizeof(SStreamDispatchReq));
int32_t numOfBlocks = taosArrayGetSize(pData->blocks);
int32_t code = tInitStreamDispatchReq(pReq, pTask, pData->srcVgId, numOfBlocks, dstTaskId, pData->type);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
for (int32_t i = 0; i < numOfBlocks; i++) {
SSDataBlock* pDataBlock = taosArrayGet(pData->blocks, i);
code = streamAddBlockIntoDispatchMsg(pDataBlock, pReq);
if (code != TSDB_CODE_SUCCESS) {
destroyDispatchMsg(pReq, 1);
return code;
}
}
return doSendDispatchMsg(pTask, pReq, vgId, pEpset);
}
static int32_t doBuildDispatchMsg(SStreamTask* pTask, const SStreamDataBlock* pData) {
int32_t code = 0;
int32_t numOfBlocks = taosArrayGetSize(pData->blocks);
@ -357,8 +375,8 @@ static int32_t sendDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pDispatch
for (int32_t i = 0; i < numOfVgroups; i++) {
if (pDispatchMsg[i].blockNum > 0) {
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
stDebug("s-task:%s (child taskId:%d) shuffle-dispatch blocks:%d to vgId:%d", pTask->id.idStr,
pTask->info.selfChildId, pDispatchMsg[i].blockNum, pVgInfo->vgId);
stDebug("s-task:%s (child taskId:%d) shuffle-dispatch blocks:%d to vgId:%d", id, pTask->info.selfChildId,
pDispatchMsg[i].blockNum, pVgInfo->vgId);
code = doSendDispatchMsg(pTask, &pDispatchMsg[i], pVgInfo->vgId, &pVgInfo->epSet);
if (code < 0) {
@ -372,8 +390,7 @@ static int32_t sendDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pDispatch
}
}
stDebug("s-task:%s complete shuffle-dispatch blocks to all %d vnodes, msgId:%d", pTask->id.idStr, numOfVgroups,
msgId);
stDebug("s-task:%s complete shuffle-dispatch blocks to all %d vnodes, msgId:%d", id, numOfVgroups, msgId);
}
return code;
@ -562,7 +579,7 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) {
return 0;
}
if (pTask->chkInfo.dispatchCheckpointTrigger) {
if (pTask->chkInfo.pActiveInfo->dispatchTrigger) {
stDebug("s-task:%s already send checkpoint trigger, not dispatch anymore", id);
atomic_store_8(&pTask->outputq.status, TASK_OUTPUT_STATUS__NORMAL);
return 0;
@ -590,6 +607,10 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) {
} else { // todo handle build dispatch msg failed
}
if (pBlock->type == STREAM_INPUT__CHECKPOINT_TRIGGER) {
streamTaskInitTriggerDispatchInfo(pTask);
}
int32_t retryCount = 0;
while (1) {
code = sendDispatchMsg(pTask, pTask->msgInfo.pData);
@ -624,18 +645,24 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) {
// this function is usually invoked by sink/agg task
int32_t streamTaskSendCheckpointReadyMsg(SStreamTask* pTask) {
int32_t num = taosArrayGetSize(pTask->pReadyMsgList);
SArray* pList = pTask->chkInfo.pActiveInfo->pReadyMsgList;
taosThreadMutexLock(&pTask->chkInfo.pActiveInfo->lock);
int32_t num = taosArrayGetSize(pList);
ASSERT(taosArrayGetSize(pTask->upstreamInfo.pList) == num);
for (int32_t i = 0; i < num; ++i) {
SStreamChkptReadyInfo* pInfo = taosArrayGet(pTask->pReadyMsgList, i);
SStreamChkptReadyInfo* pInfo = taosArrayGet(pList, i);
tmsgSendReq(&pInfo->upstreamNodeEpset, &pInfo->msg);
stDebug("s-task:%s level:%d checkpoint ready msg sent to upstream:0x%x", pTask->id.idStr, pTask->info.taskLevel,
pInfo->upStreamTaskId);
}
taosArrayClear(pTask->pReadyMsgList);
taosArrayClear(pList);
taosThreadMutexUnlock(&pTask->chkInfo.pActiveInfo->lock);
stDebug("s-task:%s level:%d checkpoint ready msg sent to all %d upstreams", pTask->id.idStr, pTask->info.taskLevel,
num);
@ -644,21 +671,22 @@ int32_t streamTaskSendCheckpointReadyMsg(SStreamTask* pTask) {
// this function is only invoked by source task, and send rsp to mnode
int32_t streamTaskSendCheckpointSourceRsp(SStreamTask* pTask) {
taosThreadMutexLock(&pTask->lock);
SArray* pList = pTask->chkInfo.pActiveInfo->pReadyMsgList;
taosThreadMutexLock(&pTask->chkInfo.pActiveInfo->lock);
ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE);
if (taosArrayGetSize(pTask->pReadyMsgList) == 1) {
SStreamChkptReadyInfo* pInfo = taosArrayGet(pTask->pReadyMsgList, 0);
if (taosArrayGetSize(pList) == 1) {
SStreamChkptReadyInfo* pInfo = taosArrayGet(pList, 0);
tmsgSendRsp(&pInfo->msg);
taosArrayClear(pTask->pReadyMsgList);
taosArrayClear(pList);
stDebug("s-task:%s level:%d source checkpoint completed msg sent to mnode", pTask->id.idStr, pTask->info.taskLevel);
} else {
stDebug("s-task:%s level:%d already send rsp checkpoint success to mnode", pTask->id.idStr, pTask->info.taskLevel);
}
taosThreadMutexUnlock(&pTask->lock);
taosThreadMutexUnlock(&pTask->chkInfo.pActiveInfo->lock);
return TSDB_CODE_SUCCESS;
}
@ -777,17 +805,34 @@ int32_t streamTaskBuildCheckpointSourceRsp(SStreamCheckpointSourceReq* pReq, SRp
}
int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SStreamTask* pTask) {
SStreamChkptReadyInfo info = {0};
SStreamChkptReadyInfo info = {
.recvTs = taosGetTimestampMs(), .transId = pReq->transId, .checkpointId = pReq->checkpointId};
streamTaskBuildCheckpointSourceRsp(pReq, pRpcInfo, &info.msg, TSDB_CODE_SUCCESS);
if (pTask->pReadyMsgList == NULL) {
pTask->pReadyMsgList = taosArrayInit(4, sizeof(SStreamChkptReadyInfo));
SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo;
taosThreadMutexLock(&pActiveInfo->lock);
int32_t size = taosArrayGetSize(pActiveInfo->pReadyMsgList);
if (size > 0) {
ASSERT(size == 1);
SStreamChkptReadyInfo* pReady = taosArrayGet(pActiveInfo->pReadyMsgList, 0);
if (pReady->transId == pReq->transId) {
stWarn("s-task:%s repeatly recv checkpoint source msg from mnode, checkpointId:%" PRId64 ", ignore",
pTask->id.idStr, pReq->checkpointId);
} else {
stError("s-task:%s checkpointId:%" PRId64 " transId:%d not completed, new transId:%d checkpointId:%" PRId64
" recv from mnode",
pTask->id.idStr, pReady->checkpointId, pReady->transId, pReq->transId, pReq->checkpointId);
ASSERT(0); // failed to handle it
}
} else {
taosArrayPush(pActiveInfo->pReadyMsgList, &info);
stDebug("s-task:%s add checkpoint source rsp msg, total:%d", pTask->id.idStr, size);
}
taosArrayPush(pTask->pReadyMsgList, &info);
int32_t size = taosArrayGetSize(pTask->pReadyMsgList);
stDebug("s-task:%s add checkpoint source rsp msg, total:%d", pTask->id.idStr, size);
taosThreadMutexUnlock(&pActiveInfo->lock);
return TSDB_CODE_SUCCESS;
}
@ -799,7 +844,7 @@ int32_t streamAddCheckpointReadyMsg(SStreamTask* pTask, int32_t upstreamTaskId,
return TSDB_CODE_SUCCESS;
}
SStreamChildEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, upstreamTaskId);
SStreamUpstreamEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, upstreamTaskId);
SStreamCheckpointReadyMsg req = {0};
req.downstreamNodeId = pTask->pMeta->vgId;
@ -833,7 +878,14 @@ int32_t streamAddCheckpointReadyMsg(SStreamTask* pTask, int32_t upstreamTaskId,
ASSERT(req.upstreamTaskId != 0);
SStreamChkptReadyInfo info = {.upStreamTaskId = pInfo->taskId, .upstreamNodeEpset = pInfo->epSet};
SStreamChkptReadyInfo info = {
.upStreamTaskId = pInfo->taskId,
.upstreamNodeEpset = pInfo->epSet,
.nodeId = req.upstreamNodeId,
.recvTs = taosGetTimestampMs(),
.checkpointId = req.checkpointId,
};
initRpcMsg(&info.msg, TDMT_STREAM_TASK_CHECKPOINT_READY, buf, tlen + sizeof(SMsgHead));
stDebug("s-task:%s (level:%d) prepare checkpoint ready msg to upstream s-task:0x%" PRIx64
@ -841,39 +893,65 @@ int32_t streamAddCheckpointReadyMsg(SStreamTask* pTask, int32_t upstreamTaskId,
pTask->id.idStr, pTask->info.taskLevel, req.streamId, req.upstreamTaskId, req.upstreamNodeId, index,
req.upstreamNodeId);
if (pTask->pReadyMsgList == NULL) {
pTask->pReadyMsgList = taosArrayInit(4, sizeof(SStreamChkptReadyInfo));
SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo;
taosThreadMutexLock(&pActiveInfo->lock);
bool recved = false;
int32_t size = taosArrayGetSize(pActiveInfo->pReadyMsgList);
for (int32_t i = 0; i < size; ++i) {
SStreamChkptReadyInfo* p = taosArrayGet(pActiveInfo->pReadyMsgList, i);
if (p->nodeId == req.upstreamNodeId) {
if (p->checkpointId == req.checkpointId) {
stWarn("s-task:%s repeatly recv checkpoint-source msg from task:0x%x vgId:%d, checkpointId:%" PRId64 ", ignore",
pTask->id.idStr, p->upStreamTaskId, p->nodeId, p->checkpointId);
} else {
stError("s-task:%s checkpointId:%" PRId64 " not completed, new checkpointId:%" PRId64 " recv",
pTask->id.idStr, p->checkpointId, checkpointId);
ASSERT(0); // failed to handle it
}
recved = true;
break;
}
}
taosArrayPush(pTask->pReadyMsgList, &info);
if (!recved) {
taosArrayPush(pActiveInfo->pReadyMsgList, &info);
}
taosThreadMutexUnlock(&pActiveInfo->lock);
return 0;
}
void streamClearChkptReadyMsg(SStreamTask* pTask) {
if (pTask->pReadyMsgList == NULL) {
SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo;
if (pActiveInfo == NULL) {
return;
}
for (int i = 0; i < taosArrayGetSize(pTask->pReadyMsgList); i++) {
SStreamChkptReadyInfo* pInfo = taosArrayGet(pTask->pReadyMsgList, i);
for (int i = 0; i < taosArrayGetSize(pActiveInfo->pReadyMsgList); i++) {
SStreamChkptReadyInfo* pInfo = taosArrayGet(pActiveInfo->pReadyMsgList, i);
rpcFreeCont(pInfo->msg.pCont);
}
taosArrayClear(pTask->pReadyMsgList);
taosArrayClear(pActiveInfo->pReadyMsgList);
}
// this message has been sent successfully, let's try next one.
static int32_t handleDispatchSuccessRsp(SStreamTask* pTask, int32_t downstreamId) {
static int32_t handleDispatchSuccessRsp(SStreamTask* pTask, int32_t downstreamId, int32_t downstreamNodeId) {
stDebug("s-task:%s destroy dispatch msg:%p", pTask->id.idStr, pTask->msgInfo.pData);
bool delayDispatch = (pTask->msgInfo.dispatchMsgType == STREAM_INPUT__CHECKPOINT_TRIGGER);
if (delayDispatch) {
taosThreadMutexLock(&pTask->lock);
// we only set the dispatch msg info for current checkpoint trans
if (streamTaskGetStatus(pTask)->state == TASK_STATUS__CK && pTask->chkInfo.checkpointingId == pTask->msgInfo.checkpointId) {
ASSERT(pTask->chkInfo.transId == pTask->msgInfo.transId);
pTask->chkInfo.dispatchCheckpointTrigger = true;
stDebug("s-task:%s checkpoint-trigger msg rsp for checkpointId:%" PRId64 " transId:%d confirmed",
pTask->id.idStr, pTask->msgInfo.checkpointId, pTask->msgInfo.transId);
if (streamTaskGetStatus(pTask)->state == TASK_STATUS__CK &&
pTask->chkInfo.pActiveInfo->activeId == pTask->msgInfo.checkpointId) {
ASSERT(pTask->chkInfo.pActiveInfo->transId == pTask->msgInfo.transId);
stDebug("s-task:%s checkpoint-trigger msg to 0x%x rsp for checkpointId:%" PRId64 " transId:%d confirmed",
pTask->id.idStr, downstreamId, pTask->msgInfo.checkpointId, pTask->msgInfo.transId);
streamTaskSetTriggerDispatchConfirmed(pTask, downstreamNodeId);
} else {
stWarn("s-task:%s checkpoint-trigger msg rsp for checkpointId:%" PRId64 " transId:%d discard, since expired",
pTask->id.idStr, pTask->msgInfo.checkpointId, pTask->msgInfo.transId);
@ -966,10 +1044,10 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
stError("s-task:%s failed to dispatch checkpoint-trigger msg, checkpointId:%" PRId64
", set the current checkpoint failed, and send rsp to mnode",
id, pTask->chkInfo.checkpointingId);
id, pTask->chkInfo.pActiveInfo->activeId);
{ // send checkpoint failure msg to mnode directly
pTask->chkInfo.failedId = pTask->chkInfo.checkpointingId; // record the latest failed checkpoint id
pTask->chkInfo.checkpointingId = pTask->chkInfo.checkpointingId;
pTask->chkInfo.pActiveInfo->failedId = pTask->chkInfo.pActiveInfo->activeId; // record the latest failed checkpoint id
pTask->chkInfo.pActiveInfo->activeId = pTask->chkInfo.pActiveInfo->activeId;
streamTaskSendCheckpointSourceRsp(pTask);
}
} else {
@ -1035,7 +1113,7 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
// now ready for next data output
atomic_store_8(&pTask->outputq.status, TASK_OUTPUT_STATUS__NORMAL);
} else {
handleDispatchSuccessRsp(pTask, pRsp->downstreamTaskId);
handleDispatchSuccessRsp(pTask, pRsp->downstreamTaskId, pRsp->downstreamNodeId);
}
}
}
@ -1096,7 +1174,7 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S
stDebug("s-task:%s receive dispatch msg from taskId:0x%x(vgId:%d), msgLen:%" PRId64 ", msgId:%d", id,
pReq->upstreamTaskId, pReq->upstreamNodeId, pReq->totalLen, pReq->msgId);
SStreamChildEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, pReq->upstreamTaskId);
SStreamUpstreamEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, pReq->upstreamTaskId);
ASSERT(pInfo != NULL);
if (pMeta->role == NODE_ROLE_FOLLOWER) {

View File

@ -426,7 +426,7 @@ int32_t streamTransferStatePrepare(SStreamTask* pTask) {
streamMetaReleaseTask(pMeta, pStreamTask);
return code;
} else {
stDebug("s-task:%s halt by related fill-history task:%s", pStreamTask->id.idStr, pTask->id.idStr);
stDebug("s-task:%s sink task halt by related fill-history task:%s", pStreamTask->id.idStr, pTask->id.idStr);
}
streamMetaReleaseTask(pMeta, pStreamTask);
}

View File

@ -1044,13 +1044,13 @@ static int32_t metaHeartbeatToMnodeImpl(SStreamMeta* pMeta) {
entry.sinkDataSize = SIZE_IN_MiB((*pTask)->execInfo.sink.dataSize);
}
if ((*pTask)->chkInfo.checkpointingId != 0) {
entry.checkpointInfo.failed = ((*pTask)->chkInfo.failedId >= (*pTask)->chkInfo.checkpointingId) ? 1 : 0;
entry.checkpointInfo.activeId = (*pTask)->chkInfo.checkpointingId;
entry.checkpointInfo.activeTransId = (*pTask)->chkInfo.transId;
if ((*pTask)->chkInfo.pActiveInfo->activeId != 0) {
entry.checkpointInfo.failed = ((*pTask)->chkInfo.pActiveInfo->failedId >= (*pTask)->chkInfo.pActiveInfo->activeId) ? 1 : 0;
entry.checkpointInfo.activeId = (*pTask)->chkInfo.pActiveInfo->activeId;
entry.checkpointInfo.activeTransId = (*pTask)->chkInfo.pActiveInfo->transId;
if (entry.checkpointInfo.failed) {
stInfo("s-task:%s set kill checkpoint trans in hb, transId:%d", (*pTask)->id.idStr, (*pTask)->chkInfo.transId);
stInfo("s-task:%s set kill checkpoint trans in hb, transId:%d", (*pTask)->id.idStr, (*pTask)->chkInfo.pActiveInfo->transId);
}
}

View File

@ -24,6 +24,8 @@
static void streamTaskDestroyUpstreamInfo(SUpstreamInfo* pUpstreamInfo);
static void streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet, bool* pUpdated);
static void streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet, bool* pUpdate);
static void streamTaskDestroyActiveChkptInfo(SActiveCheckpointInfo* pInfo);
static SActiveCheckpointInfo* streamTaskCreateActiveChkptInfo();
static int32_t addToTaskset(SArray* pArray, SStreamTask* pTask) {
int32_t childId = taosArrayGetSize(pArray);
@ -70,12 +72,12 @@ static void freeItem(void* p) {
}
static void freeUpstreamItem(void* p) {
SStreamChildEpInfo** pInfo = p;
SStreamUpstreamEpInfo** pInfo = p;
taosMemoryFree(*pInfo);
}
static SStreamChildEpInfo* createStreamTaskEpInfo(const SStreamTask* pTask) {
SStreamChildEpInfo* pEpInfo = taosMemoryMalloc(sizeof(SStreamChildEpInfo));
static SStreamUpstreamEpInfo* createStreamTaskEpInfo(const SStreamTask* pTask) {
SStreamUpstreamEpInfo* pEpInfo = taosMemoryMalloc(sizeof(SStreamUpstreamEpInfo));
if (pEpInfo == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
@ -254,7 +256,6 @@ void tFreeStreamTask(SStreamTask* pTask) {
}
streamClearChkptReadyMsg(pTask);
pTask->pReadyMsgList = taosArrayDestroy(pTask->pReadyMsgList);
if (pTask->msgInfo.pData != NULL) {
clearBufferedDispatchMsg(pTask);
@ -302,6 +303,9 @@ void tFreeStreamTask(SStreamTask* pTask) {
taosMemoryFree((void*)pTask->id.idStr);
}
streamTaskDestroyActiveChkptInfo(pTask->chkInfo.pActiveInfo);
pTask->chkInfo.pActiveInfo = NULL;
taosMemoryFree(pTask);
stDebug("s-task:0x%x free task completed", taskId);
}
@ -414,6 +418,10 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i
return TSDB_CODE_OUT_OF_MEMORY;
}
if (pTask->chkInfo.pActiveInfo == NULL) {
pTask->chkInfo.pActiveInfo = streamTaskCreateActiveChkptInfo();
}
return TSDB_CODE_SUCCESS;
}
@ -433,8 +441,12 @@ int32_t streamTaskGetNumOfDownstream(const SStreamTask* pTask) {
}
}
int32_t streamTaskGetNumOfUpstream(const SStreamTask* pTask) {
return taosArrayGetSize(pTask->upstreamInfo.pList);
}
int32_t streamTaskSetUpstreamInfo(SStreamTask* pTask, const SStreamTask* pUpstreamTask) {
SStreamChildEpInfo* pEpInfo = createStreamTaskEpInfo(pUpstreamTask);
SStreamUpstreamEpInfo* pEpInfo = createStreamTaskEpInfo(pUpstreamTask);
if (pEpInfo == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
@ -453,7 +465,7 @@ void streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpS
int32_t numOfUpstream = taosArrayGetSize(pTask->upstreamInfo.pList);
for (int32_t i = 0; i < numOfUpstream; ++i) {
SStreamChildEpInfo* pInfo = taosArrayGetP(pTask->upstreamInfo.pList, i);
SStreamUpstreamEpInfo* pInfo = taosArrayGetP(pTask->upstreamInfo.pList, i);
if (pInfo->nodeId == nodeId) {
bool equal = isEpsetEqual(&pInfo->epSet, pEpSet);
if (!equal) {
@ -589,7 +601,7 @@ void streamTaskResetUpstreamStageInfo(SStreamTask* pTask) {
int32_t size = taosArrayGetSize(pTask->upstreamInfo.pList);
for (int32_t i = 0; i < size; ++i) {
SStreamChildEpInfo* pInfo = taosArrayGetP(pTask->upstreamInfo.pList, i);
SStreamUpstreamEpInfo* pInfo = taosArrayGetP(pTask->upstreamInfo.pList, i);
pInfo->stage = -1;
}
@ -603,7 +615,7 @@ void streamTaskOpenAllUpstreamInput(SStreamTask* pTask) {
}
for (int32_t i = 0; i < num; ++i) {
SStreamChildEpInfo* pInfo = taosArrayGetP(pTask->upstreamInfo.pList, i);
SStreamUpstreamEpInfo* pInfo = taosArrayGetP(pTask->upstreamInfo.pList, i);
pInfo->dataAllowed = true;
}
@ -612,12 +624,19 @@ void streamTaskOpenAllUpstreamInput(SStreamTask* pTask) {
}
void streamTaskCloseUpstreamInput(SStreamTask* pTask, int32_t taskId) {
SStreamChildEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, taskId);
SStreamUpstreamEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, taskId);
if (pInfo != NULL) {
pInfo->dataAllowed = false;
}
}
void streamTaskOpenUpstreamInput(SStreamTask* pTask, int32_t taskId) {
SStreamUpstreamEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, taskId);
if (pInfo != NULL) {
pInfo->dataAllowed = true;
}
}
bool streamTaskIsAllUpstreamClosed(SStreamTask* pTask) {
return pTask->upstreamInfo.numOfClosed == taosArrayGetSize(pTask->upstreamInfo.pList);
}
@ -723,9 +742,9 @@ int32_t streamBuildAndSendCheckpointUpdateMsg(SMsgCb* pMsgCb, int32_t vgId, SStr
pReq->dropRelHTask = dropRelHTask;
pReq->hStreamId = pHTaskId->streamId;
pReq->hTaskId = pHTaskId->taskId;
pReq->transId = pCheckpointInfo->transId;
pReq->transId = pCheckpointInfo->pActiveInfo->transId;
pReq->checkpointId = pCheckpointInfo->checkpointingId;
pReq->checkpointId = pCheckpointInfo->pActiveInfo->activeId;
pReq->checkpointVer = pCheckpointInfo->processedVer;
pReq->checkpointTs = pCheckpointInfo->startTs;
@ -860,10 +879,10 @@ int32_t streamTaskSendCheckpointReq(SStreamTask* pTask) {
return 0;
}
SStreamChildEpInfo* streamTaskGetUpstreamTaskEpInfo(SStreamTask* pTask, int32_t taskId) {
SStreamUpstreamEpInfo* streamTaskGetUpstreamTaskEpInfo(SStreamTask* pTask, int32_t taskId) {
int32_t num = taosArrayGetSize(pTask->upstreamInfo.pList);
for (int32_t i = 0; i < num; ++i) {
SStreamChildEpInfo* pInfo = taosArrayGetP(pTask->upstreamInfo.pList, i);
SStreamUpstreamEpInfo* pInfo = taosArrayGetP(pTask->upstreamInfo.pList, i);
if (pInfo->taskId == taskId) {
return pInfo;
}
@ -873,6 +892,24 @@ SStreamChildEpInfo* streamTaskGetUpstreamTaskEpInfo(SStreamTask* pTask, int32_t
return NULL;
}
SEpSet* streamTaskGetDownstreamEpInfo(SStreamTask* pTask, int32_t taskId) {
if (pTask->info.taskLevel == TASK_OUTPUT__FIXED_DISPATCH) {
if (pTask->outputInfo.fixedDispatcher.taskId == taskId) {
return &pTask->outputInfo.fixedDispatcher.epSet;
}
} else if (pTask->info.taskLevel == TASK_OUTPUT__SHUFFLE_DISPATCH) {
SArray* pList = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos;
for(int32_t i = 0; i < taosArrayGetSize(pList); ++i) {
SVgroupInfo* pVgInfo = taosArrayGet(pList, i);
if (pVgInfo->taskId == taskId) {
return &pVgInfo->epSet;
}
}
}
return NULL;
}
char* createStreamTaskIdStr(int64_t streamId, int32_t taskId) {
char buf[128] = {0};
sprintf(buf, "0x%" PRIx64 "-0x%x", streamId, taskId);
@ -914,4 +951,64 @@ int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq) {
void streamTaskSetRemoveBackendFiles(SStreamTask* pTask) {
pTask->status.removeBackendFiles = true;
}
int32_t streamTaskGetActiveCheckpointInfo(const SStreamTask* pTask, int32_t* pTransId, int64_t* pCheckpointId) {
if (pTransId != NULL) {
*pTransId = pTask->chkInfo.pActiveInfo->transId;
}
if (pCheckpointId != NULL) {
*pCheckpointId = pTask->chkInfo.pActiveInfo->activeId;
}
return TSDB_CODE_SUCCESS;
}
int32_t streamTaskSetActiveCheckpointInfo(SStreamTask* pTask, int64_t activeCheckpointId) {
pTask->chkInfo.pActiveInfo->activeId = activeCheckpointId;
return TSDB_CODE_SUCCESS;
}
int32_t streamTaskSetFailedChkptInfo(SStreamTask* pTask, int32_t transId, int64_t checkpointId) {
pTask->chkInfo.pActiveInfo->transId = transId;
pTask->chkInfo.pActiveInfo->activeId = checkpointId;
pTask->chkInfo.pActiveInfo->failedId = checkpointId;
return TSDB_CODE_SUCCESS;
}
SActiveCheckpointInfo* streamTaskCreateActiveChkptInfo() {
SActiveCheckpointInfo* pInfo = taosMemoryCalloc(1, sizeof(SActiveCheckpointInfo));
taosThreadMutexInit(&pInfo->lock, NULL);
pInfo->pDispatchTriggerList = taosArrayInit(4, sizeof(STaskTriggerSendInfo));
pInfo->pReadyMsgList = taosArrayInit(4, sizeof(SStreamChkptReadyInfo));
return pInfo;
}
void streamTaskDestroyActiveChkptInfo(SActiveCheckpointInfo* pInfo) {
if (pInfo == NULL) {
return;
}
taosThreadMutexDestroy(&pInfo->lock);
pInfo->pDispatchTriggerList = taosArrayDestroy(pInfo->pDispatchTriggerList);
pInfo->pReadyMsgList = taosArrayDestroy(pInfo->pReadyMsgList);
if (pInfo->pCheckTmr != NULL) {
taosTmrStop(pInfo->pCheckTmr);
pInfo->pCheckTmr = NULL;
}
taosMemoryFree(pInfo);
}
void streamTaskClearActiveInfo(SActiveCheckpointInfo* pInfo) {
pInfo->activeId = 0; // clear the checkpoint id
pInfo->failedId = 0;
pInfo->transId = 0;
pInfo->dispatchTrigger = false;
taosArrayClear(pInfo->pReadyMsgList);
taosArrayClear(pInfo->pDispatchTriggerList);
}

View File

@ -17,7 +17,7 @@
#include "streammsg.h"
#include "tstream.h"
int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo) {
int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamUpstreamEpInfo* pInfo) {
if (tEncodeI32(pEncoder, pInfo->taskId) < 0) return -1;
if (tEncodeI32(pEncoder, pInfo->nodeId) < 0) return -1;
if (tEncodeI32(pEncoder, pInfo->childId) < 0) return -1;
@ -26,7 +26,7 @@ int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo)
return 0;
}
int32_t tDecodeStreamEpInfo(SDecoder* pDecoder, SStreamChildEpInfo* pInfo) {
int32_t tDecodeStreamEpInfo(SDecoder* pDecoder, SStreamUpstreamEpInfo* pInfo) {
if (tDecodeI32(pDecoder, &pInfo->taskId) < 0) return -1;
if (tDecodeI32(pDecoder, &pInfo->nodeId) < 0) return -1;
if (tDecodeI32(pDecoder, &pInfo->childId) < 0) return -1;
@ -481,7 +481,7 @@ int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
int32_t epSz = taosArrayGetSize(pTask->upstreamInfo.pList);
if (tEncodeI32(pEncoder, epSz) < 0) return -1;
for (int32_t i = 0; i < epSz; i++) {
SStreamChildEpInfo* pInfo = taosArrayGetP(pTask->upstreamInfo.pList, i);
SStreamUpstreamEpInfo* pInfo = taosArrayGetP(pTask->upstreamInfo.pList, i);
if (tEncodeStreamEpInfo(pEncoder, pInfo) < 0) return -1;
}
@ -557,7 +557,7 @@ int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
pTask->upstreamInfo.pList = taosArrayInit(epSz, POINTER_BYTES);
for (int32_t i = 0; i < epSz; i++) {
SStreamChildEpInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamChildEpInfo));
SStreamUpstreamEpInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamUpstreamEpInfo));
if (pInfo == NULL) return -1;
if (tDecodeStreamEpInfo(pDecoder, pInfo) < 0) {
taosMemoryFreeClear(pInfo);