Merge branch 'enh/triggerCheckPoint2' of https://github.com/taosdata/TDengine into enh/triggerCheckPoint2

This commit is contained in:
yihaoDeng 2023-08-23 14:27:35 +08:00
commit ccb675fe2b
4 changed files with 30 additions and 1 deletions

View File

@ -1888,6 +1888,7 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
streamTaskUpdateEpsetInfo(pTask, req.pNodeList);
{
taosWLockLatch(&pMeta->lock);
streamSetStatusNormal(pTask);
streamMetaSaveTask(pMeta, pTask);
taosWUnLockLatch(&pMeta->lock);
}

View File

@ -71,6 +71,7 @@ int32_t streamTaskGetNumOfDownstream(const SStreamTask* pTask);
int32_t extractBlocksFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks);
SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* pElem);
int32_t streamTaskBuildScanhistoryRspMsg(SStreamTask* pTask, SStreamScanHistoryFinishReq* pReq, void** pBuffer, int32_t* pLen);
int32_t streamAddEndScanHistoryMsg(SStreamTask* pTask, SRpcHandleInfo* pRpcInfo, SStreamScanHistoryFinishReq* pReq);
int32_t streamNotifyUpstreamContinue(SStreamTask* pTask);
int32_t streamTaskFillHistoryFinished(SStreamTask* pTask);

View File

@ -848,7 +848,7 @@ int32_t tDecodeCompleteHistoryDataMsg(SDecoder* pDecoder, SStreamCompleteHistory
return 0;
}
int32_t streamAddEndScanHistoryMsg(SStreamTask* pTask, SRpcHandleInfo* pRpcInfo, SStreamScanHistoryFinishReq* pReq) {
int32_t streamTaskBuildScanhistoryRspMsg(SStreamTask* pTask, SStreamScanHistoryFinishReq* pReq, void** pBuffer, int32_t* pLen) {
int32_t len = 0;
int32_t code = 0;
SEncoder encoder;
@ -879,6 +879,16 @@ int32_t streamAddEndScanHistoryMsg(SStreamTask* pTask, SRpcHandleInfo* pRpcInfo,
tEncodeCompleteHistoryDataMsg(&encoder, &msg);
tEncoderClear(&encoder);
*pBuffer = pBuf;
*pLen = len;
return 0;
}
int32_t streamAddEndScanHistoryMsg(SStreamTask* pTask, SRpcHandleInfo* pRpcInfo, SStreamScanHistoryFinishReq* pReq) {
void* pBuf = NULL;
int32_t len = 0;
streamTaskBuildScanhistoryRspMsg(pTask, pReq, &pBuf, &len);
SStreamChildEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, pReq->upstreamTaskId);
SStreamContinueExecInfo info = {.taskId = pReq->upstreamTaskId, .epset = pInfo->epSet};

View File

@ -423,6 +423,23 @@ int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, SStreamScanHistory
int32_t taskLevel = pTask->info.taskLevel;
ASSERT(taskLevel == TASK_LEVEL__AGG || taskLevel == TASK_LEVEL__SINK);
if (pTask->status.taskStatus != TASK_STATUS__SCAN_HISTORY) {
qError("s-task:%s not in scan-history status, return upstream:0x%x scan-history finish directly", pTask->id.idStr,
pReq->upstreamTaskId);
void* pBuf = NULL;
int32_t len = 0;
SRpcMsg msg = {0};
streamTaskBuildScanhistoryRspMsg(pTask, pReq, &pBuf, &len);
initRpcMsg(&msg, 0, pBuf, sizeof(SMsgHead) + len);
tmsgSendRsp(&msg);
qDebug("s-task:%s level:%d notify upstream:0x%x(vgId:%d) to continue process data from WAL", pTask->id.idStr,
pTask->info.taskLevel, pReq->upstreamTaskId, pReq->upstreamNodeId);
return 0;
}
// sink tasks do not send end of scan history msg to its upstream, which is agg task.
streamAddEndScanHistoryMsg(pTask, pRpcInfo, pReq);