Merge branch 'enh/triggerChpt' into enh/triggerCheckPoint2

This commit is contained in:
yihaoDeng 2023-07-19 02:17:42 +00:00
commit b1f3dfe88f
5 changed files with 8 additions and 7 deletions

View File

@ -593,7 +593,7 @@ int32_t streamDispatchTransferStateMsg(SStreamTask* pTask);
// agg level
int32_t streamAggScanHistoryPrepare(SStreamTask* pTask);
int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, int32_t taskId, int32_t childId);
int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, int32_t childId);
// stream task meta
void streamMetaInit();

View File

@ -90,6 +90,7 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t ver) {
pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, 0);
ASSERT(pTask->exec.pExecutor);
streamTaskOpenAllUpstreamInput(pTask);
streamSetupScheduleTrigger(pTask);
qDebug("snode:%d expand stream task on snode, s-task:%s, checkpoint ver:%" PRId64 " child id:%d, level:%d", SNODE_HANDLE,
@ -292,7 +293,7 @@ int32_t sndProcessTaskRecoverFinishReq(SSnode *pSnode, SRpcMsg *pMsg) {
return -1;
}
// do process request
if (streamProcessScanHistoryFinishReq(pTask, req.taskId, req.childId) < 0) {
if (streamProcessScanHistoryFinishReq(pTask, req.childId) < 0) {
streamMetaReleaseTask(pSnode->pMeta, pTask);
return -1;
}

View File

@ -1325,7 +1325,7 @@ int32_t tqProcessStreamTaskScanHistoryFinishReq(STQ* pTq, SRpcMsg* pMsg) {
return -1;
}
int32_t code = streamProcessScanHistoryFinishReq(pTask, req.taskId, req.childId);
int32_t code = streamProcessScanHistoryFinishReq(pTask, req.childId);
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
return code;
}

View File

@ -379,7 +379,7 @@ int32_t streamAggUpstreamScanHistoryFinish(SStreamTask* pTask) {
return 0;
}
int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, int32_t taskId, int32_t childId) {
int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, int32_t childId) {
if (pTask->info.taskLevel == TASK_LEVEL__AGG) {
int32_t left = atomic_sub_fetch_32(&pTask->numOfWaitingUpstream, 1);
ASSERT(left >= 0);
@ -391,8 +391,8 @@ int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, int32_t taskId, in
streamAggUpstreamScanHistoryFinish(pTask);
} else {
qDebug("s-task:%s receive scan-history data finish msg from upstream:0x%x(index:%d), unfinished:%d",
pTask->id.idStr, taskId, childId, left);
qDebug("s-task:%s receive scan-history data finish msg from upstream (index:%d), unfinished:%d",
pTask->id.idStr, childId, left);
}
}

View File

@ -118,7 +118,7 @@ echo "statusInterval 1" >> $TAOS_CFG
echo "dataDir $DATA_DIR" >> $TAOS_CFG
echo "logDir $LOG_DIR" >> $TAOS_CFG
echo "debugFlag 0" >> $TAOS_CFG
echo "tmrDebugFlag 143" >> $TAOS_CFG
echo "tmrDebugFlag 131" >> $TAOS_CFG
echo "uDebugFlag 143" >> $TAOS_CFG
echo "rpcDebugFlag 143" >> $TAOS_CFG
echo "jniDebugFlag 143" >> $TAOS_CFG