fix(stream): start stream task in case of scan history completing.
This commit is contained in:
parent
437eb93a4d
commit
b73444b291
|
@ -602,7 +602,7 @@ int32_t streamDispatchTransferStateMsg(SStreamTask* pTask);
|
||||||
|
|
||||||
// agg level
|
// agg level
|
||||||
int32_t streamAggRecoverPrepare(SStreamTask* pTask);
|
int32_t streamAggRecoverPrepare(SStreamTask* pTask);
|
||||||
int32_t streamProcessRecoverFinishReq(SStreamTask* pTask, int32_t childId);
|
int32_t streamProcessRecoverFinishReq(SStreamTask* pTask, int32_t taskId, int32_t childId);
|
||||||
|
|
||||||
void streamMetaInit();
|
void streamMetaInit();
|
||||||
void streamMetaCleanup();
|
void streamMetaCleanup();
|
||||||
|
|
|
@ -292,7 +292,7 @@ int32_t sndProcessTaskRecoverFinishReq(SSnode *pSnode, SRpcMsg *pMsg) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
// do process request
|
// do process request
|
||||||
if (streamProcessRecoverFinishReq(pTask, req.childId) < 0) {
|
if (streamProcessRecoverFinishReq(pTask, req.taskId, req.childId) < 0) {
|
||||||
streamMetaReleaseTask(pSnode->pMeta, pTask);
|
streamMetaReleaseTask(pSnode->pMeta, pTask);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
|
@ -894,8 +894,9 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
|
||||||
|
|
||||||
streamSetupTrigger(pTask);
|
streamSetupTrigger(pTask);
|
||||||
|
|
||||||
tqInfo("vgId:%d expand stream task, s-task:%s, checkpoint ver:%" PRId64 " child id:%d, level:%d", vgId,
|
tqInfo("vgId:%d expand stream task, s-task:%s, checkpoint ver:%" PRId64 " child id:%d, level:%d, scan-history:%d",
|
||||||
pTask->id.idStr, pTask->chkInfo.version, pTask->info.selfChildId, pTask->info.taskLevel);
|
vgId, pTask->id.idStr, pTask->chkInfo.version, pTask->info.selfChildId, pTask->info.taskLevel,
|
||||||
|
pTask->info.fillHistory);
|
||||||
|
|
||||||
// next valid version will add one
|
// next valid version will add one
|
||||||
pTask->chkInfo.version += 1;
|
pTask->chkInfo.version += 1;
|
||||||
|
@ -1072,7 +1073,9 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// do recovery step 1
|
// do recovery step 1
|
||||||
tqDebug("s-task:%s start history data scan stage(step 1), status:%s", pTask->id.idStr, streamGetTaskStatusStr(pTask->status.taskStatus));
|
const char* pId = pTask->id.idStr;
|
||||||
|
tqDebug("s-task:%s start history data scan stage(step 1), status:%s", pId,
|
||||||
|
streamGetTaskStatusStr(pTask->status.taskStatus));
|
||||||
|
|
||||||
int64_t st = taosGetTimestampMs();
|
int64_t st = taosGetTimestampMs();
|
||||||
int8_t schedStatus = atomic_val_compare_exchange_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE,
|
int8_t schedStatus = atomic_val_compare_exchange_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE,
|
||||||
|
@ -1085,15 +1088,16 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
if (!pReq->igUntreated && !streamTaskRecoverScanStep1Finished(pTask)) {
|
if (!pReq->igUntreated && !streamTaskRecoverScanStep1Finished(pTask)) {
|
||||||
streamSourceScanHistoryData(pTask);
|
streamSourceScanHistoryData(pTask);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING || streamTaskShouldPause(&pTask->status)) {
|
if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING || streamTaskShouldPause(&pTask->status)) {
|
||||||
tqDebug("s-task:%s is dropped or paused, abort recover in step1", pTask->id.idStr);
|
tqDebug("s-task:%s is dropped or paused, abort recover in step1", pId);
|
||||||
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
|
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
|
||||||
streamMetaReleaseTask(pMeta, pTask);
|
streamMetaReleaseTask(pMeta, pTask);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
double el = (taosGetTimestampMs() - st) / 1000.0;
|
double el = (taosGetTimestampMs() - st) / 1000.0;
|
||||||
tqDebug("s-task:%s history data scan stage(step 1) ended, elapsed time:%.2fs", pTask->id.idStr, el);
|
tqDebug("s-task:%s history data scan stage(step 1) ended, elapsed time:%.2fs", pId, el);
|
||||||
|
|
||||||
if (pTask->info.fillHistory) {
|
if (pTask->info.fillHistory) {
|
||||||
SVersionRange* pRange = NULL;
|
SVersionRange* pRange = NULL;
|
||||||
|
@ -1118,7 +1122,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
// now we can stop the stream task execution
|
// now we can stop the stream task execution
|
||||||
pStreamTask->status.taskStatus = TASK_STATUS__HALT;
|
pStreamTask->status.taskStatus = TASK_STATUS__HALT;
|
||||||
tqDebug("s-task:%s level:%d status is set to halt by history scan task:%s", pStreamTask->id.idStr,
|
tqDebug("s-task:%s level:%d status is set to halt by history scan task:%s", pStreamTask->id.idStr,
|
||||||
pStreamTask->info.taskLevel, pTask->id.idStr);
|
pStreamTask->info.taskLevel, pId);
|
||||||
|
|
||||||
// if it's an source task, extract the last version in wal.
|
// if it's an source task, extract the last version in wal.
|
||||||
int64_t ver = pTask->dataRange.range.maxVer + 1;
|
int64_t ver = pTask->dataRange.range.maxVer + 1;
|
||||||
|
@ -1135,14 +1139,14 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
if (pRange->minVer == pRange->maxVer) {
|
if (pRange->minVer == pRange->maxVer) {
|
||||||
streamTaskRecoverSetAllStepFinished(pTask);
|
streamTaskRecoverSetAllStepFinished(pTask);
|
||||||
tqDebug("s-task:%s no need to perform secondary scan-history-data(step 2), since no new data ingest",
|
tqDebug("s-task:%s no need to perform secondary scan-history-data(step 2), since no new data ingest",
|
||||||
pTask->id.idStr);
|
pId);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!streamTaskRecoverScanStep1Finished(pTask)) {
|
if (!streamTaskRecoverScanStep1Finished(pTask)) {
|
||||||
tqDebug("s-task:%s level:%d verRange:%" PRId64 " - %" PRId64
|
tqDebug("s-task:%s level:%d verRange:%" PRId64 " - %" PRId64
|
||||||
" do secondary scan-history-data after halt the related stream task:%s",
|
" do secondary scan-history-data after halt the related stream task:%s",
|
||||||
pTask->id.idStr, pTask->info.taskLevel, pRange->minVer, pRange->maxVer, pStreamTask->id.idStr);
|
pId, pTask->info.taskLevel, pRange->minVer, pRange->maxVer, pStreamTask->id.idStr);
|
||||||
|
|
||||||
ASSERT(pTask->status.schedStatus == TASK_SCHED_STATUS__WAITING);
|
ASSERT(pTask->status.schedStatus == TASK_SCHED_STATUS__WAITING);
|
||||||
st = taosGetTimestampMs();
|
st = taosGetTimestampMs();
|
||||||
|
@ -1153,7 +1157,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
if(!streamTaskRecoverScanStep2Finished(pTask)) {
|
if(!streamTaskRecoverScanStep2Finished(pTask)) {
|
||||||
streamSourceScanHistoryData(pTask);
|
streamSourceScanHistoryData(pTask);
|
||||||
if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING || streamTaskShouldPause(&pTask->status)) {
|
if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING || streamTaskShouldPause(&pTask->status)) {
|
||||||
tqDebug("s-task:%s is dropped or paused, abort recover in step1", pTask->id.idStr);
|
tqDebug("s-task:%s is dropped or paused, abort recover in step1", pId);
|
||||||
streamMetaReleaseTask(pMeta, pTask);
|
streamMetaReleaseTask(pMeta, pTask);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -1161,12 +1165,11 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
el = (taosGetTimestampMs() - st) / 1000.0;
|
el = (taosGetTimestampMs() - st) / 1000.0;
|
||||||
tqDebug("s-task:%s history data scan stage(step 2) ended, elapsed time:%.2fs", pTask->id.idStr, el);
|
tqDebug("s-task:%s history data scan stage(step 2) ended, elapsed time:%.2fs", pId, el);
|
||||||
|
|
||||||
if (!pTask->status.transferState) {
|
if (!pTask->status.transferState) {
|
||||||
// 3. notify the downstream tasks to transfer executor state after handle all history blocks.
|
// 3. notify the downstream tasks to transfer executor state after handle all history blocks.
|
||||||
pTask->status.transferState = true;
|
pTask->status.transferState = true;
|
||||||
|
|
||||||
code = streamDispatchTransferStateMsg(pTask);
|
code = streamDispatchTransferStateMsg(pTask);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
// todo handle error
|
// todo handle error
|
||||||
|
@ -1178,7 +1181,9 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
streamTryExec(pTask);
|
streamTryExec(pTask);
|
||||||
|
|
||||||
pTask->status.taskStatus = TASK_STATUS__DROPPING;
|
pTask->status.taskStatus = TASK_STATUS__DROPPING;
|
||||||
tqDebug("s-task:%s set status to be dropping", pTask->id.idStr);
|
|
||||||
|
tqDebug("s-task:%s set status to be dropping", pId);
|
||||||
|
|
||||||
// transfer the ownership of executor state
|
// transfer the ownership of executor state
|
||||||
streamTaskReleaseState(pTask);
|
streamTaskReleaseState(pTask);
|
||||||
streamTaskReloadState(pStreamTask);
|
streamTaskReloadState(pStreamTask);
|
||||||
|
@ -1195,19 +1200,25 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
// todo update the chkInfo version for current task.
|
// todo update the chkInfo version for current task.
|
||||||
// this task has an associated history stream task, so we need to scan wal from the end version of
|
// this task has an associated history stream task, so we need to scan wal from the end version of
|
||||||
// history scan. The current version of chkInfo.current is not updated during the history scan
|
// history scan. The current version of chkInfo.current is not updated during the history scan
|
||||||
|
STimeWindow* pWindow = &pTask->dataRange.window;
|
||||||
|
|
||||||
if (pTask->historyTaskId.taskId == 0) {
|
if (pTask->historyTaskId.taskId == 0) {
|
||||||
pTask->dataRange.window.ekey = INT64_MAX;
|
*pWindow = (STimeWindow){INT64_MIN, INT64_MAX};
|
||||||
pTask->dataRange.window.skey = INT64_MIN;
|
tqDebug("s-task:%s without associated stream task, reset the time window:%" PRId64 " - %" PRId64, pId,
|
||||||
tqDebug("s-task:%s without associated stream task, reset the time window:%"PRId64" - %"PRId64, pTask->id.idStr,
|
pWindow->skey, pWindow->ekey);
|
||||||
pTask->dataRange.window.skey, pTask->dataRange.window.ekey);
|
|
||||||
} else {
|
} else {
|
||||||
tqDebug("s-task:%s history data scan completed, now start to scan data from wal, start ver:%" PRId64
|
tqDebug("s-task:%s history data scan completed, now start to scan data from wal, start ver:%" PRId64
|
||||||
", window:%" PRId64 " - %" PRId64,
|
", window:%" PRId64 " - %" PRId64, pId, pTask->chkInfo.currentVer, pWindow->skey, pWindow->ekey);
|
||||||
pTask->id.idStr, pTask->chkInfo.currentVer, pTask->dataRange.window.skey, pTask->dataRange.window.ekey);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
code = streamTaskScanHistoryDataComplete(pTask);
|
code = streamTaskScanHistoryDataComplete(pTask);
|
||||||
streamMetaReleaseTask(pMeta, pTask);
|
streamMetaReleaseTask(pMeta, pTask);
|
||||||
|
|
||||||
|
// let's start the stream task by extracting data from wal
|
||||||
|
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
|
||||||
|
tqStartStreamTasks(pTq);
|
||||||
|
}
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1311,7 +1322,7 @@ int32_t tqProcessTaskRecoverFinishReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
// do process request
|
// do process request
|
||||||
if (streamProcessRecoverFinishReq(pTask, req.childId) < 0) {
|
if (streamProcessRecoverFinishReq(pTask, req.taskId, req.childId) < 0) {
|
||||||
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
|
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -1402,9 +1413,9 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
pTask->chkInfo.version);
|
pTask->chkInfo.version);
|
||||||
streamProcessRunReq(pTask);
|
streamProcessRunReq(pTask);
|
||||||
} else {
|
} else {
|
||||||
if (streamTaskShouldPause(&pTask->status)) {
|
// if (streamTaskShouldPause(&pTask->status)) {
|
||||||
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
|
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
|
||||||
}
|
// }
|
||||||
|
|
||||||
tqDebug("vgId:%d s-task:%s ignore run req since not in ready state, status:%s, sched-status:%d", vgId,
|
tqDebug("vgId:%d s-task:%s ignore run req since not in ready state, status:%s, sched-status:%d", vgId,
|
||||||
pTask->id.idStr, streamGetTaskStatusStr(pTask->status.taskStatus), pTask->status.schedStatus);
|
pTask->id.idStr, streamGetTaskStatusStr(pTask->status.taskStatus), pTask->status.schedStatus);
|
||||||
|
@ -1543,19 +1554,23 @@ int32_t tqProcessTaskResumeReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg) {
|
int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
char* msgStr = pMsg->pCont;
|
char* msgStr = pMsg->pCont;
|
||||||
char* msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
|
char* msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
|
||||||
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
|
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
|
||||||
|
SDecoder decoder;
|
||||||
|
|
||||||
SStreamRetrieveReq req;
|
SStreamRetrieveReq req;
|
||||||
SDecoder decoder;
|
|
||||||
tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
|
tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
|
||||||
tDecodeStreamRetrieveReq(&decoder, &req);
|
tDecodeStreamRetrieveReq(&decoder, &req);
|
||||||
tDecoderClear(&decoder);
|
tDecoderClear(&decoder);
|
||||||
|
|
||||||
int32_t taskId = req.dstTaskId;
|
int32_t taskId = req.dstTaskId;
|
||||||
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
|
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
|
||||||
|
|
||||||
if (pTask) {
|
if (pTask) {
|
||||||
SRpcMsg rsp = {.info = pMsg->info, .code = 0};
|
SRpcMsg rsp = {.info = pMsg->info, .code = 0};
|
||||||
streamProcessRetrieveReq(pTask, &req, &rsp);
|
streamProcessRetrieveReq(pTask, &req, &rsp);
|
||||||
|
|
||||||
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
|
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
|
||||||
tDeleteStreamRetrieveReq(&req);
|
tDeleteStreamRetrieveReq(&req);
|
||||||
return 0;
|
return 0;
|
||||||
|
|
|
@ -170,21 +170,18 @@ int32_t streamTaskEnqueueRetrieve(SStreamTask* pTask, SStreamRetrieveReq* pReq,
|
||||||
|
|
||||||
// enqueue
|
// enqueue
|
||||||
if (pData != NULL) {
|
if (pData != NULL) {
|
||||||
qDebug("s-task:%s (child %d) recv retrieve req from task:0x%x, reqId %" PRId64, pTask->id.idStr, pTask->info.selfChildId,
|
qDebug("s-task:%s (child %d) recv retrieve req from task:0x%x, reqId:0x%" PRIx64, pTask->id.idStr, pTask->info.selfChildId,
|
||||||
pReq->srcTaskId, pReq->reqId);
|
pReq->srcTaskId, pReq->reqId);
|
||||||
|
|
||||||
pData->type = STREAM_INPUT__DATA_RETRIEVE;
|
pData->type = STREAM_INPUT__DATA_RETRIEVE;
|
||||||
pData->srcVgId = 0;
|
pData->srcVgId = 0;
|
||||||
// decode
|
|
||||||
/*pData->blocks = pReq->data;*/
|
|
||||||
/*pBlock->sourceVer = pReq->sourceVer;*/
|
|
||||||
streamRetrieveReqToData(pReq, pData);
|
streamRetrieveReqToData(pReq, pData);
|
||||||
if (tAppendDataToInputQueue(pTask, (SStreamQueueItem*)pData) == 0) {
|
if (tAppendDataToInputQueue(pTask, (SStreamQueueItem*)pData) == 0) {
|
||||||
status = TASK_INPUT_STATUS__NORMAL;
|
status = TASK_INPUT_STATUS__NORMAL;
|
||||||
} else {
|
} else {
|
||||||
status = TASK_INPUT_STATUS__FAILED;
|
status = TASK_INPUT_STATUS__FAILED;
|
||||||
}
|
}
|
||||||
} else {
|
} else { // todo handle oom
|
||||||
/*streamTaskInputFail(pTask);*/
|
/*streamTaskInputFail(pTask);*/
|
||||||
/*status = TASK_INPUT_STATUS__FAILED;*/
|
/*status = TASK_INPUT_STATUS__FAILED;*/
|
||||||
}
|
}
|
||||||
|
@ -199,6 +196,7 @@ int32_t streamTaskEnqueueRetrieve(SStreamTask* pTask, SStreamRetrieveReq* pReq,
|
||||||
pRsp->pCont = buf;
|
pRsp->pCont = buf;
|
||||||
pRsp->contLen = sizeof(SMsgHead) + sizeof(SStreamRetrieveRsp);
|
pRsp->contLen = sizeof(SMsgHead) + sizeof(SStreamRetrieveRsp);
|
||||||
tmsgSendRsp(pRsp);
|
tmsgSendRsp(pRsp);
|
||||||
|
|
||||||
return status == TASK_INPUT_STATUS__NORMAL ? 0 : -1;
|
return status == TASK_INPUT_STATUS__NORMAL ? 0 : -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -314,7 +314,10 @@ int32_t streamDoDispatchScanHistoryFinishMsg(SStreamTask* pTask, const SStreamRe
|
||||||
msg.info.noResp = 1;
|
msg.info.noResp = 1;
|
||||||
|
|
||||||
tmsgSendReq(pEpSet, &msg);
|
tmsgSendReq(pEpSet, &msg);
|
||||||
qDebug("s-task:%s dispatch scan-history-data finish msg to taskId:0x%x (vgId:%d)", pTask->id.idStr, pReq->taskId, vgId);
|
|
||||||
|
const char* pStatus = streamGetTaskStatusStr(pTask->status.taskStatus);
|
||||||
|
qDebug("s-task:%s status:%s dispatch scan-history-data finish msg to taskId:0x%x (vgId:%d)", pTask->id.idStr, pStatus,
|
||||||
|
pReq->taskId, vgId);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -86,8 +86,7 @@ int32_t streamTaskLaunchScanHistory(SStreamTask* pTask) {
|
||||||
// check status
|
// check status
|
||||||
int32_t streamTaskCheckDownstreamTasks(SStreamTask* pTask) {
|
int32_t streamTaskCheckDownstreamTasks(SStreamTask* pTask) {
|
||||||
SHistDataRange* pRange = &pTask->dataRange;
|
SHistDataRange* pRange = &pTask->dataRange;
|
||||||
qDebug("s-task:%s check downstream tasks, ver:%" PRId64 "-%" PRId64 " window:%" PRId64 "-%" PRId64,
|
STimeWindow* pWindow = &pRange->window;
|
||||||
pTask->id.idStr, pRange->range.minVer, pRange->range.maxVer, pRange->window.skey, pRange->window.ekey);
|
|
||||||
|
|
||||||
SStreamTaskCheckReq req = {
|
SStreamTaskCheckReq req = {
|
||||||
.streamId = pTask->id.streamId,
|
.streamId = pTask->id.streamId,
|
||||||
|
@ -98,13 +97,16 @@ int32_t streamTaskCheckDownstreamTasks(SStreamTask* pTask) {
|
||||||
|
|
||||||
// serialize
|
// serialize
|
||||||
if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) {
|
if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) {
|
||||||
|
qDebug("s-task:%s check single downstream task:0x%x(vgId:%d) ver:%" PRId64 "-%" PRId64 " window:%" PRId64
|
||||||
|
"-%" PRId64,
|
||||||
|
pTask->id.idStr, req.downstreamTaskId, req.downstreamNodeId, pRange->range.minVer, pRange->range.maxVer,
|
||||||
|
pWindow->skey, pWindow->ekey);
|
||||||
|
|
||||||
req.reqId = tGenIdPI64();
|
req.reqId = tGenIdPI64();
|
||||||
req.downstreamNodeId = pTask->fixedEpDispatcher.nodeId;
|
req.downstreamNodeId = pTask->fixedEpDispatcher.nodeId;
|
||||||
req.downstreamTaskId = pTask->fixedEpDispatcher.taskId;
|
req.downstreamTaskId = pTask->fixedEpDispatcher.taskId;
|
||||||
pTask->checkReqId = req.reqId;
|
pTask->checkReqId = req.reqId;
|
||||||
|
|
||||||
qDebug("s-task:%s (vgId:%d) check downstream task:0x%x (vgId:%d)", pTask->id.idStr, pTask->info.nodeId, req.downstreamTaskId,
|
|
||||||
req.downstreamNodeId);
|
|
||||||
streamDispatchCheckMsg(pTask, &req, pTask->fixedEpDispatcher.nodeId, &pTask->fixedEpDispatcher.epSet);
|
streamDispatchCheckMsg(pTask, &req, pTask->fixedEpDispatcher.nodeId, &pTask->fixedEpDispatcher.epSet);
|
||||||
} else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
} else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
||||||
SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
|
SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
|
||||||
|
@ -113,14 +115,17 @@ int32_t streamTaskCheckDownstreamTasks(SStreamTask* pTask) {
|
||||||
pTask->notReadyTasks = numOfVgs;
|
pTask->notReadyTasks = numOfVgs;
|
||||||
pTask->checkReqIds = taosArrayInit(numOfVgs, sizeof(int64_t));
|
pTask->checkReqIds = taosArrayInit(numOfVgs, sizeof(int64_t));
|
||||||
|
|
||||||
|
qDebug("s-task:%s check %d downstream tasks, ver:%" PRId64 "-%" PRId64 " window:%" PRId64 "-%" PRId64,
|
||||||
|
pTask->id.idStr, numOfVgs, pRange->range.minVer, pRange->range.maxVer, pWindow->skey, pWindow->ekey);
|
||||||
|
|
||||||
for (int32_t i = 0; i < numOfVgs; i++) {
|
for (int32_t i = 0; i < numOfVgs; i++) {
|
||||||
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
|
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
|
||||||
req.reqId = tGenIdPI64();
|
req.reqId = tGenIdPI64();
|
||||||
taosArrayPush(pTask->checkReqIds, &req.reqId);
|
taosArrayPush(pTask->checkReqIds, &req.reqId);
|
||||||
req.downstreamNodeId = pVgInfo->vgId;
|
req.downstreamNodeId = pVgInfo->vgId;
|
||||||
req.downstreamTaskId = pVgInfo->taskId;
|
req.downstreamTaskId = pVgInfo->taskId;
|
||||||
qDebug("s-task:%s (vgId:%d) check downstream task:0x%x (vgId:%d) (shuffle)", pTask->id.idStr, pTask->info.nodeId,
|
qDebug("s-task:%s (vgId:%d) check downstream task:0x%x (vgId:%d) (shuffle), idx:%d", pTask->id.idStr, pTask->info.nodeId,
|
||||||
req.downstreamTaskId, req.downstreamNodeId);
|
req.downstreamTaskId, req.downstreamNodeId, i);
|
||||||
streamDispatchCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet);
|
streamDispatchCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
@ -303,9 +308,6 @@ int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask) {
|
||||||
|
|
||||||
// serialize
|
// serialize
|
||||||
if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) {
|
if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) {
|
||||||
qDebug("s-task:%s send scan-history-data complete msg to downstream (fix-dispatch) to taskId:0x%x, status:%s", pTask->id.idStr,
|
|
||||||
pTask->fixedEpDispatcher.taskId, streamGetTaskStatusStr(pTask->status.taskStatus));
|
|
||||||
|
|
||||||
req.taskId = pTask->fixedEpDispatcher.taskId;
|
req.taskId = pTask->fixedEpDispatcher.taskId;
|
||||||
streamDoDispatchScanHistoryFinishMsg(pTask, &req, pTask->fixedEpDispatcher.nodeId, &pTask->fixedEpDispatcher.epSet);
|
streamDoDispatchScanHistoryFinishMsg(pTask, &req, pTask->fixedEpDispatcher.nodeId, &pTask->fixedEpDispatcher.epSet);
|
||||||
} else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
} else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
||||||
|
@ -393,7 +395,7 @@ int32_t streamDispatchTransferStateMsg(SStreamTask* pTask) {
|
||||||
// agg
|
// agg
|
||||||
int32_t streamAggRecoverPrepare(SStreamTask* pTask) {
|
int32_t streamAggRecoverPrepare(SStreamTask* pTask) {
|
||||||
pTask->numOfWaitingUpstream = taosArrayGetSize(pTask->pUpstreamEpInfoList);
|
pTask->numOfWaitingUpstream = taosArrayGetSize(pTask->pUpstreamEpInfoList);
|
||||||
qDebug("s-task:%s agg task is ready and wait for %d upstream tasks complete fill history procedure", pTask->id.idStr,
|
qDebug("s-task:%s agg task is ready and wait for %d upstream tasks complete scan-history procedure", pTask->id.idStr,
|
||||||
pTask->numOfWaitingUpstream);
|
pTask->numOfWaitingUpstream);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -412,7 +414,7 @@ int32_t streamAggUpstreamScanHistoryFinish(SStreamTask* pTask) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamProcessRecoverFinishReq(SStreamTask* pTask, int32_t childId) {
|
int32_t streamProcessRecoverFinishReq(SStreamTask* pTask, int32_t taskId, int32_t childId) {
|
||||||
if (pTask->info.taskLevel == TASK_LEVEL__AGG) {
|
if (pTask->info.taskLevel == TASK_LEVEL__AGG) {
|
||||||
int32_t left = atomic_sub_fetch_32(&pTask->numOfWaitingUpstream, 1);
|
int32_t left = atomic_sub_fetch_32(&pTask->numOfWaitingUpstream, 1);
|
||||||
ASSERT(left >= 0);
|
ASSERT(left >= 0);
|
||||||
|
@ -422,7 +424,8 @@ int32_t streamProcessRecoverFinishReq(SStreamTask* pTask, int32_t childId) {
|
||||||
qDebug("s-task:%s all %d upstream tasks finish scan-history data", pTask->id.idStr, numOfTasks);
|
qDebug("s-task:%s all %d upstream tasks finish scan-history data", pTask->id.idStr, numOfTasks);
|
||||||
streamAggUpstreamScanHistoryFinish(pTask);
|
streamAggUpstreamScanHistoryFinish(pTask);
|
||||||
} else {
|
} else {
|
||||||
qDebug("s-task:%s remain unfinished upstream tasks:%d", pTask->id.idStr, left);
|
qDebug("s-task:%s receive scan-history data finish msg from upstream:0x%x(index:%d), unfinished:%d",
|
||||||
|
pTask->id.idStr, taskId, childId, left);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -563,11 +566,12 @@ int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask) {
|
||||||
}
|
}
|
||||||
|
|
||||||
ASSERT(pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY);
|
ASSERT(pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY);
|
||||||
/*code = */streamSetStatusNormal(pTask);
|
|
||||||
|
|
||||||
|
// ready to process data from inputQ
|
||||||
|
streamSetStatusNormal(pTask);
|
||||||
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
|
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
|
||||||
|
|
||||||
// todo check rsp
|
// todo check rsp, commit data
|
||||||
streamMetaSaveTask(pMeta, pTask);
|
streamMetaSaveTask(pMeta, pTask);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
|
@ -576,7 +576,6 @@ $loop_count = 0
|
||||||
|
|
||||||
print step 7
|
print step 7
|
||||||
|
|
||||||
|
|
||||||
sql create database test3 vgroups 6;
|
sql create database test3 vgroups 6;
|
||||||
sql use test3;
|
sql use test3;
|
||||||
sql create stable st(ts timestamp, a int, b int, c int, d double) tags(ta int,tb int,tc int);
|
sql create stable st(ts timestamp, a int, b int, c int, d double) tags(ta int,tb int,tc int);
|
||||||
|
|
Loading…
Reference in New Issue