enh(stream): halt the initial stream tasks when fill-history is set for count window agg
This commit is contained in:
parent
49ddc87667
commit
7a18e5910c
|
@ -3184,18 +3184,11 @@ typedef struct {
|
|||
|
||||
typedef struct {
|
||||
SMsgHead head;
|
||||
int64_t leftForVer;
|
||||
int64_t resetRelHalt; // reset related stream task halt status
|
||||
int64_t streamId;
|
||||
int32_t taskId;
|
||||
} SVDropStreamTaskReq;
|
||||
|
||||
typedef struct {
|
||||
SMsgHead head;
|
||||
int64_t streamId;
|
||||
int32_t taskId;
|
||||
int64_t dataVer;
|
||||
} SVStreamTaskVerUpdateReq;
|
||||
|
||||
typedef struct {
|
||||
int8_t reserved;
|
||||
} SVDropStreamTaskRsp;
|
||||
|
|
|
@ -796,7 +796,7 @@ bool streamTaskIsAllUpstreamClosed(SStreamTask* pTask);
|
|||
bool streamTaskSetSchedStatusWait(SStreamTask* pTask);
|
||||
int8_t streamTaskSetSchedStatusActive(SStreamTask* pTask);
|
||||
int8_t streamTaskSetSchedStatusInactive(SStreamTask* pTask);
|
||||
int32_t streamTaskClearHTaskAttr(SStreamTask* pTask, bool metaLock);
|
||||
int32_t streamTaskClearHTaskAttr(SStreamTask* pTask, int32_t clearRelHalt, bool metaLock);
|
||||
|
||||
int32_t streamTaskHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event);
|
||||
int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent event);
|
||||
|
@ -882,7 +882,7 @@ int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask);
|
|||
int32_t streamTaskBuildCheckpoint(SStreamTask* pTask);
|
||||
void streamTaskClearCheckInfo(SStreamTask* pTask, bool clearChkpReadyMsg);
|
||||
int32_t streamAlignTransferState(SStreamTask* pTask);
|
||||
int32_t streamBuildAndSendDropTaskMsg(SMsgCb* pMsgCb, int32_t vgId, SStreamTaskId* pTaskId);
|
||||
int32_t streamBuildAndSendDropTaskMsg(SMsgCb* pMsgCb, int32_t vgId, SStreamTaskId* pTaskId, int64_t resetRelHalt);
|
||||
int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SStreamTask* pTask,
|
||||
int8_t isSucceed);
|
||||
int32_t buildCheckpointSourceRsp(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SRpcMsg* pMsg,
|
||||
|
|
|
@ -30,6 +30,28 @@ extern bool tsDeployOnSnode;
|
|||
static int32_t doAddSinkTask(SStreamObj* pStream, SArray* pTaskList, SMnode* pMnode, int32_t vgId, SVgObj* pVgroup,
|
||||
SEpSet* pEpset, bool isFillhistory);
|
||||
|
||||
static bool hasCountWindowNode(SPhysiNode* pNode) {
|
||||
if (nodeType(pNode) == QUERY_NODE_PHYSICAL_PLAN_STREAM_COUNT) {
|
||||
return true;
|
||||
} else {
|
||||
size_t size = LIST_LENGTH(pNode->pChildren);
|
||||
|
||||
for (int32_t i = 0; i < size; ++i) {
|
||||
SPhysiNode* pChild = (SPhysiNode*)nodesListGetNode(pNode->pChildren, i);
|
||||
if (hasCountWindowNode(pChild)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
static bool countWindowStreamTask(SSubplan* pPlan) {
|
||||
SPhysiNode* pNode = pPlan->pNode;
|
||||
return hasCountWindowNode(pNode);
|
||||
}
|
||||
|
||||
int32_t mndConvertRsmaTask(char** pDst, int32_t* pDstLen, const char* ast, int64_t uid, int8_t triggerType,
|
||||
int64_t watermark, int64_t deleteMark) {
|
||||
SNode* pAst = NULL;
|
||||
|
@ -312,6 +334,14 @@ static int32_t addSourceTask(SMnode* pMnode, SVgObj* pVgroup, SArray* pTaskList,
|
|||
return terrno;
|
||||
}
|
||||
|
||||
bool hasCountWindowNode = countWindowStreamTask(plan);
|
||||
if (hasCountWindowNode && (!fillHistory) && hasFillHistory) {
|
||||
SStreamStatus* pStatus = &pTask->status;
|
||||
mDebug("s-task:0x%x status is set to %s from %s for count window agg task with fill-history option set",
|
||||
pTask->id.taskId, streamTaskGetStatusStr(pStatus->taskStatus), streamTaskGetStatusStr(TASK_STATUS__HALT));
|
||||
pStatus->taskStatus = TASK_STATUS__HALT;
|
||||
}
|
||||
|
||||
for(int32_t i = 0; i < taosArrayGetSize(pSinkTaskList); ++i) {
|
||||
SStreamTask* pSinkTask = taosArrayGetP(pSinkTaskList, i);
|
||||
streamTaskSetUpstreamInfo(pSinkTask, pTask);
|
||||
|
|
|
@ -841,22 +841,24 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t nextProcessVer) {
|
|||
}
|
||||
|
||||
char* p = streamTaskGetStatus(pTask)->name;
|
||||
const char* pNext = streamTaskGetStatusStr(pTask->status.taskStatus);
|
||||
|
||||
if (pTask->info.fillHistory) {
|
||||
tqInfo("vgId:%d expand stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64
|
||||
" nextProcessVer:%" PRId64
|
||||
" child id:%d, level:%d, status:%s fill-history:%d, related stream task:0x%x trigger:%" PRId64
|
||||
" ms, inputVer:%" PRId64,
|
||||
" child id:%d, level:%d, cur-status:%s, next-status:%s fill-history:%d, related stream task:0x%x "
|
||||
"trigger:%" PRId64 " ms, inputVer:%" PRId64,
|
||||
vgId, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer,
|
||||
pTask->info.selfChildId, pTask->info.taskLevel, p, pTask->info.fillHistory,
|
||||
pTask->info.selfChildId, pTask->info.taskLevel, p, pNext, pTask->info.fillHistory,
|
||||
(int32_t)pTask->streamTaskId.taskId, pTask->info.triggerParam, nextProcessVer);
|
||||
} else {
|
||||
tqInfo("vgId:%d expand stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64
|
||||
tqInfo(
|
||||
"vgId:%d expand stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64
|
||||
" nextProcessVer:%" PRId64
|
||||
" child id:%d, level:%d, status:%s fill-history:%d, related fill-task:0x%x trigger:%" PRId64
|
||||
" child id:%d, level:%d, cur-status:%s next-status:%s fill-history:%d, related fill-task:0x%x trigger:%" PRId64
|
||||
" ms, inputVer:%" PRId64,
|
||||
vgId, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer,
|
||||
pTask->info.selfChildId, pTask->info.taskLevel, p, pTask->info.fillHistory,
|
||||
pTask->info.selfChildId, pTask->info.taskLevel, p, pNext, pTask->info.fillHistory,
|
||||
(int32_t)pTask->hTaskInfo.id.taskId, pTask->info.triggerParam, nextProcessVer);
|
||||
}
|
||||
|
||||
|
@ -1016,8 +1018,8 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
|
|||
tqError("failed to find s-task:0x%" PRIx64 ", it may have been destroyed, drop related fill-history task:%s",
|
||||
pTask->streamTaskId.taskId, pTask->id.idStr);
|
||||
|
||||
tqDebug("s-task:%s fill-history task set status to be dropping", id);
|
||||
streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &pTask->id);
|
||||
tqDebug("s-task:%s fill-history task set status to be dropping and drop it", id);
|
||||
streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &pTask->id, 0);
|
||||
|
||||
atomic_store_32(&pTask->status.inScanHistorySentinel, 0);
|
||||
streamMetaReleaseTask(pMeta, pTask);
|
||||
|
|
|
@ -602,6 +602,10 @@ int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen
|
|||
streamMetaReleaseTask(pMeta, pTask);
|
||||
}
|
||||
|
||||
streamMetaWLock(pMeta);
|
||||
streamTaskClearHTaskAttr(pTask, pReq->resetRelHalt, false);
|
||||
streamMetaWUnLock(pMeta);
|
||||
|
||||
// drop the stream task now
|
||||
streamMetaUnregisterTask(pMeta, pReq->streamId, pReq->taskId);
|
||||
|
||||
|
|
|
@ -512,7 +512,7 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) {
|
|||
SStreamTaskId hTaskId = {.streamId = pTask->hTaskInfo.id.streamId, .taskId = pTask->hTaskInfo.id.taskId};
|
||||
|
||||
stDebug("s-task:%s fill-history finish checkpoint done, drop related fill-history task:0x%x", id, hTaskId.taskId);
|
||||
streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pTask->pMeta->vgId, &hTaskId);
|
||||
streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pTask->pMeta->vgId, &hTaskId, 1);
|
||||
} else {
|
||||
stWarn("s-task:%s related fill-history task:0x%x is erased", id, (int32_t)pTask->hTaskInfo.id.taskId);
|
||||
}
|
||||
|
|
|
@ -328,7 +328,7 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
|
|||
id, (int32_t) pTask->streamTaskId.taskId);
|
||||
|
||||
// 1. free it and remove fill-history task from disk meta-store
|
||||
streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &pTask->id);
|
||||
streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &pTask->id, 0);
|
||||
|
||||
// 2. save to disk
|
||||
streamMetaWLock(pMeta);
|
||||
|
|
|
@ -725,9 +725,6 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t
|
|||
|
||||
// it is an fill-history task, remove the related stream task's id that points to it
|
||||
atomic_sub_fetch_32(&pMeta->numOfStreamTasks, 1);
|
||||
if (pTask->info.fillHistory == 1) {
|
||||
streamTaskClearHTaskAttr(pTask, false);
|
||||
}
|
||||
|
||||
taosHashRemove(pMeta->pTasksMap, &id, sizeof(id));
|
||||
doRemoveIdFromList(pMeta, (int32_t)taosArrayGetSize(pMeta->pTaskList), &pTask->id);
|
||||
|
|
|
@ -391,6 +391,16 @@ void doProcessDownstreamReadyRsp(SStreamTask* pTask) {
|
|||
int64_t startTs = pTask->execInfo.start;
|
||||
streamMetaAddTaskLaunchResult(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, initTs, startTs, true);
|
||||
|
||||
if (pTask->status.taskStatus == TASK_STATUS__HALT) {
|
||||
ASSERT(HAS_RELATED_FILLHISTORY_TASK(pTask) && (pTask->info.fillHistory == 0));
|
||||
|
||||
// halt it self for count window stream task until the related
|
||||
// fill history task completd.
|
||||
stDebug("s-task:%s level:%d initial status is %s from mnode, set it to be halt", pTask->id.idStr,
|
||||
pTask->info.taskLevel, streamTaskGetStatusStr(pTask->status.taskStatus));
|
||||
streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_HALT);
|
||||
}
|
||||
|
||||
// start the related fill-history task, when current task is ready
|
||||
// not invoke in success callback due to the deadlock.
|
||||
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
|
||||
|
@ -804,7 +814,7 @@ int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) {
|
|||
|
||||
// check stream task status in the first place.
|
||||
SStreamTaskState* pStatus = streamTaskGetStatus(pTask);
|
||||
if (pStatus->state != TASK_STATUS__READY) {
|
||||
if (pStatus->state != TASK_STATUS__READY && pStatus->state != TASK_STATUS__HALT) {
|
||||
stDebug("s-task:%s not launch related fill-history task:0x%" PRIx64 "-0x%x, status:%s", idStr, hStreamId, hTaskId,
|
||||
pStatus->name);
|
||||
|
||||
|
|
|
@ -108,7 +108,7 @@ SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, SEpSet* pEpset,
|
|||
|
||||
pTask->id.idStr = taosStrdup(buf);
|
||||
pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE;
|
||||
pTask->status.taskStatus = (fillHistory || hasFillhistory) ? TASK_STATUS__SCAN_HISTORY : TASK_STATUS__READY;
|
||||
pTask->status.taskStatus = fillHistory? TASK_STATUS__SCAN_HISTORY : TASK_STATUS__READY;
|
||||
pTask->inputq.status = TASK_INPUT_STATUS__NORMAL;
|
||||
pTask->outputq.status = TASK_OUTPUT_STATUS__NORMAL;
|
||||
|
||||
|
@ -126,7 +126,6 @@ int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo)
|
|||
if (tEncodeI32(pEncoder, pInfo->taskId) < 0) return -1;
|
||||
if (tEncodeI32(pEncoder, pInfo->nodeId) < 0) return -1;
|
||||
if (tEncodeI32(pEncoder, pInfo->childId) < 0) return -1;
|
||||
/*if (tEncodeI64(pEncoder, pInfo->processedVer) < 0) return -1;*/
|
||||
if (tEncodeSEpSet(pEncoder, &pInfo->epSet) < 0) return -1;
|
||||
if (tEncodeI64(pEncoder, pInfo->stage) < 0) return -1;
|
||||
return 0;
|
||||
|
@ -136,7 +135,6 @@ int32_t tDecodeStreamEpInfo(SDecoder* pDecoder, SStreamChildEpInfo* pInfo) {
|
|||
if (tDecodeI32(pDecoder, &pInfo->taskId) < 0) return -1;
|
||||
if (tDecodeI32(pDecoder, &pInfo->nodeId) < 0) return -1;
|
||||
if (tDecodeI32(pDecoder, &pInfo->childId) < 0) return -1;
|
||||
/*if (tDecodeI64(pDecoder, &pInfo->processedVer) < 0) return -1;*/
|
||||
if (tDecodeSEpSet(pDecoder, &pInfo->epSet) < 0) return -1;
|
||||
if (tDecodeI64(pDecoder, &pInfo->stage) < 0) return -1;
|
||||
return 0;
|
||||
|
@ -294,7 +292,6 @@ int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
|
|||
}
|
||||
|
||||
int32_t tDecodeStreamTaskChkInfo(SDecoder* pDecoder, SCheckpointInfo* pChkpInfo) {
|
||||
int64_t ver;
|
||||
int64_t skip64;
|
||||
int8_t skip8;
|
||||
int32_t skip32;
|
||||
|
@ -648,7 +645,7 @@ int32_t streamTaskStop(SStreamTask* pTask) {
|
|||
|
||||
streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_STOP);
|
||||
qKillTask(pTask->exec.pExecutor, TSDB_CODE_SUCCESS);
|
||||
while (/*pTask->status.schedStatus != TASK_SCHED_STATUS__INACTIVE */ !streamTaskIsIdle(pTask)) {
|
||||
while (!streamTaskIsIdle(pTask)) {
|
||||
stDebug("s-task:%s level:%d wait for task to be idle and then close, check again in 100ms", id,
|
||||
pTask->info.taskLevel);
|
||||
taosMsleep(100);
|
||||
|
@ -755,7 +752,7 @@ int8_t streamTaskSetSchedStatusInactive(SStreamTask* pTask) {
|
|||
return status;
|
||||
}
|
||||
|
||||
int32_t streamTaskClearHTaskAttr(SStreamTask* pTask, bool metaLock) {
|
||||
int32_t streamTaskClearHTaskAttr(SStreamTask* pTask, int32_t resetRelHalt, bool metaLock) {
|
||||
SStreamMeta* pMeta = pTask->pMeta;
|
||||
STaskId sTaskId = {.streamId = pTask->streamTaskId.streamId, .taskId = pTask->streamTaskId.taskId};
|
||||
if (pTask->info.fillHistory == 0) {
|
||||
|
@ -773,6 +770,12 @@ int32_t streamTaskClearHTaskAttr(SStreamTask* pTask, bool metaLock) {
|
|||
|
||||
taosThreadMutexLock(&(*ppStreamTask)->lock);
|
||||
CLEAR_RELATED_FILLHISTORY_TASK((*ppStreamTask));
|
||||
|
||||
if (resetRelHalt) {
|
||||
(*ppStreamTask)->status.taskStatus = TASK_STATUS__READY;
|
||||
stDebug("s-task:0x%" PRIx64 " set the status to be ready", sTaskId.taskId);
|
||||
}
|
||||
|
||||
streamMetaSaveTask(pMeta, *ppStreamTask);
|
||||
taosThreadMutexUnlock(&(*ppStreamTask)->lock);
|
||||
}
|
||||
|
@ -784,7 +787,7 @@ int32_t streamTaskClearHTaskAttr(SStreamTask* pTask, bool metaLock) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t streamBuildAndSendDropTaskMsg(SMsgCb* pMsgCb, int32_t vgId, SStreamTaskId* pTaskId) {
|
||||
int32_t streamBuildAndSendDropTaskMsg(SMsgCb* pMsgCb, int32_t vgId, SStreamTaskId* pTaskId, int64_t resetRelHalt) {
|
||||
SVDropStreamTaskReq* pReq = rpcMallocCont(sizeof(SVDropStreamTaskReq));
|
||||
if (pReq == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
@ -794,6 +797,7 @@ int32_t streamBuildAndSendDropTaskMsg(SMsgCb* pMsgCb, int32_t vgId, SStreamTaskI
|
|||
pReq->head.vgId = vgId;
|
||||
pReq->taskId = pTaskId->taskId;
|
||||
pReq->streamId = pTaskId->streamId;
|
||||
pReq->resetRelHalt = resetRelHalt;
|
||||
|
||||
SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_DROP, .pCont = pReq, .contLen = sizeof(SVDropStreamTaskReq)};
|
||||
int32_t code = tmsgPutToQueue(pMsgCb, WRITE_QUEUE, &msg);
|
||||
|
|
Loading…
Reference in New Issue