Merge pull request #22424 from taosdata/fix/liaohj
refactor: exec directly not asynchnoized.
This commit is contained in:
commit
776b32aeae
|
@ -593,7 +593,6 @@ int32_t streamTaskOutputResultBlock(SStreamTask* pTask, SStreamDataBlock* pBlock
|
||||||
bool streamTaskShouldStop(const SStreamStatus* pStatus);
|
bool streamTaskShouldStop(const SStreamStatus* pStatus);
|
||||||
bool streamTaskShouldPause(const SStreamStatus* pStatus);
|
bool streamTaskShouldPause(const SStreamStatus* pStatus);
|
||||||
bool streamTaskIsIdle(const SStreamTask* pTask);
|
bool streamTaskIsIdle(const SStreamTask* pTask);
|
||||||
int32_t streamTaskEndScanWAL(SStreamTask* pTask);
|
|
||||||
|
|
||||||
SStreamChildEpInfo* streamTaskGetUpstreamTaskEpInfo(SStreamTask* pTask, int32_t taskId);
|
SStreamChildEpInfo* streamTaskGetUpstreamTaskEpInfo(SStreamTask* pTask, int32_t taskId);
|
||||||
int32_t streamScanExec(SStreamTask* pTask, int32_t batchSize);
|
int32_t streamScanExec(SStreamTask* pTask, int32_t batchSize);
|
||||||
|
|
|
@ -1274,6 +1274,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
pTask->tsInfo.step2Start = taosGetTimestampMs();
|
pTask->tsInfo.step2Start = taosGetTimestampMs();
|
||||||
qDebug("s-task:%s scan-history from WAL stage(step 2) ended, elapsed time:%.2fs", id, 0.0);
|
qDebug("s-task:%s scan-history from WAL stage(step 2) ended, elapsed time:%.2fs", id, 0.0);
|
||||||
appendTranstateIntoInputQ(pTask);
|
appendTranstateIntoInputQ(pTask);
|
||||||
|
streamTryExec(pTask); // exec directly
|
||||||
} else {
|
} else {
|
||||||
STimeWindow* pWindow = &pTask->dataRange.window;
|
STimeWindow* pWindow = &pTask->dataRange.window;
|
||||||
tqDebug("s-task:%s level:%d verRange:%" PRId64 " - %" PRId64 " window:%" PRId64 "-%" PRId64
|
tqDebug("s-task:%s level:%d verRange:%" PRId64 " - %" PRId64 " window:%" PRId64 "-%" PRId64
|
||||||
|
@ -1525,7 +1526,7 @@ int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
if (pTask) {
|
if (pTask) {
|
||||||
streamProcessDispatchRsp(pTask, pRsp, pMsg->code);
|
streamProcessDispatchRsp(pTask, pRsp, pMsg->code);
|
||||||
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
|
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
|
||||||
return 0;
|
return TSDB_CODE_SUCCESS;
|
||||||
} else {
|
} else {
|
||||||
tqDebug("vgId:%d failed to handle the dispatch rsp, since find task:0x%x failed", vgId, taskId);
|
tqDebug("vgId:%d failed to handle the dispatch rsp, since find task:0x%x failed", vgId, taskId);
|
||||||
return TSDB_CODE_INVALID_MSG;
|
return TSDB_CODE_INVALID_MSG;
|
||||||
|
|
|
@ -339,8 +339,12 @@ int32_t extractMsgFromWal(SWalReader* pReader, void** pItem, int64_t maxVer, con
|
||||||
void* pBody = POINTER_SHIFT(pReader->pHead->head.body, sizeof(SMsgHead));
|
void* pBody = POINTER_SHIFT(pReader->pHead->head.body, sizeof(SMsgHead));
|
||||||
int32_t len = pReader->pHead->head.bodyLen - sizeof(SMsgHead);
|
int32_t len = pReader->pHead->head.bodyLen - sizeof(SMsgHead);
|
||||||
|
|
||||||
extractDelDataBlock(pBody, len, ver, (SStreamRefDataBlock**)pItem);
|
code = extractDelDataBlock(pBody, len, ver, (SStreamRefDataBlock**)pItem);
|
||||||
tqDebug("s-task:%s delete msg extract from WAL, len:%d, ver:%"PRId64, id, len, ver);
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
tqError("s-task:%s extract delete msg from WAL failed, code:%s", id, tstrerror(code));
|
||||||
|
} else {
|
||||||
|
tqDebug("s-task:%s delete msg extract from WAL, len:%d, ver:%"PRId64, id, len, ver);
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
}
|
}
|
||||||
|
|
|
@ -211,12 +211,13 @@ int32_t doSetOffsetForWalReader(SStreamTask *pTask, int32_t vgId) {
|
||||||
|
|
||||||
static void checkForFillHistoryVerRange(SStreamTask* pTask, int64_t ver) {
|
static void checkForFillHistoryVerRange(SStreamTask* pTask, int64_t ver) {
|
||||||
const char* id = pTask->id.idStr;
|
const char* id = pTask->id.idStr;
|
||||||
|
int64_t maxVer = pTask->dataRange.range.maxVer;
|
||||||
|
|
||||||
if ((pTask->info.fillHistory == 1) && ver > pTask->dataRange.range.maxVer) {
|
if ((pTask->info.fillHistory == 1) && ver > pTask->dataRange.range.maxVer) {
|
||||||
if (!pTask->status.appendTranstateBlock) {
|
if (!pTask->status.appendTranstateBlock) {
|
||||||
qWarn("s-task:%s fill-history scan WAL, currentVer:%" PRId64 " reach the maximum ver:%" PRId64
|
qWarn("s-task:%s fill-history scan WAL, currentVer:%" PRId64 " reach the maximum ver:%" PRId64
|
||||||
", not scan wal anymore, add transfer-state block into inputQ",
|
", not scan wal anymore, add transfer-state block into inputQ",
|
||||||
id, ver, pTask->dataRange.range.maxVer);
|
id, ver, maxVer);
|
||||||
|
|
||||||
double el = (taosGetTimestampMs() - pTask->tsInfo.step2Start) / 1000.0;
|
double el = (taosGetTimestampMs() - pTask->tsInfo.step2Start) / 1000.0;
|
||||||
qDebug("s-task:%s scan-history from WAL stage(step 2) ended, elapsed time:%.2fs", id, el);
|
qDebug("s-task:%s scan-history from WAL stage(step 2) ended, elapsed time:%.2fs", id, el);
|
||||||
|
@ -224,7 +225,7 @@ static void checkForFillHistoryVerRange(SStreamTask* pTask, int64_t ver) {
|
||||||
/*int32_t code = */streamSchedExec(pTask);
|
/*int32_t code = */streamSchedExec(pTask);
|
||||||
} else {
|
} else {
|
||||||
qWarn("s-task:%s fill-history scan WAL, currentVer:%" PRId64 " reach the maximum ver:%" PRId64 ", not scan wal",
|
qWarn("s-task:%s fill-history scan WAL, currentVer:%" PRId64 " reach the maximum ver:%" PRId64 ", not scan wal",
|
||||||
id, ver, pTask->dataRange.range.maxVer);
|
id, ver, maxVer);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -757,11 +757,11 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
|
||||||
qDebug("s-task:%s dispatch transtate msg to downstream successfully, start to transfer state", id);
|
qDebug("s-task:%s dispatch transtate msg to downstream successfully, start to transfer state", id);
|
||||||
ASSERT(pTask->info.fillHistory == 1);
|
ASSERT(pTask->info.fillHistory == 1);
|
||||||
code = streamTransferStateToStreamTask(pTask);
|
code = streamTransferStateToStreamTask(pTask);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) { // todo: do nothing if error happens
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
|
||||||
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
|
|
||||||
return code;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
streamFreeQitem(pTask->msgInfo.pData);
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
pTask->msgInfo.retryCount = 0;
|
pTask->msgInfo.retryCount = 0;
|
||||||
|
|
|
@ -292,9 +292,20 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
|
||||||
|
|
||||||
SStreamTask* pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.streamId, pTask->streamTaskId.taskId);
|
SStreamTask* pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.streamId, pTask->streamTaskId.taskId);
|
||||||
if (pStreamTask == NULL) {
|
if (pStreamTask == NULL) {
|
||||||
// todo: destroy the fill-history task here
|
qError(
|
||||||
qError("s-task:%s failed to find related stream task:0x%x, it may have been destroyed or closed", pTask->id.idStr,
|
"s-task:%s failed to find related stream task:0x%x, it may have been destroyed or closed, destroy the related "
|
||||||
pTask->streamTaskId.taskId);
|
"fill-history task",
|
||||||
|
pTask->id.idStr, pTask->streamTaskId.taskId);
|
||||||
|
|
||||||
|
// 1. free it and remove fill-history task from disk meta-store
|
||||||
|
streamMetaUnregisterTask(pMeta, pTask->id.streamId, pTask->id.taskId);
|
||||||
|
|
||||||
|
// 2. save to disk
|
||||||
|
taosWLockLatch(&pMeta->lock);
|
||||||
|
if (streamMetaCommit(pMeta) < 0) {
|
||||||
|
// persist to disk
|
||||||
|
}
|
||||||
|
taosWUnLockLatch(&pMeta->lock);
|
||||||
return TSDB_CODE_STREAM_TASK_NOT_EXIST;
|
return TSDB_CODE_STREAM_TASK_NOT_EXIST;
|
||||||
} else {
|
} else {
|
||||||
qDebug("s-task:%s fill-history task end, update related stream task:%s info, transfer exec state", pTask->id.idStr,
|
qDebug("s-task:%s fill-history task end, update related stream task:%s info, transfer exec state", pTask->id.idStr,
|
||||||
|
@ -334,9 +345,6 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
|
||||||
qDebug("s-task:%s no need to update time window for non-source task", pStreamTask->id.idStr);
|
qDebug("s-task:%s no need to update time window for non-source task", pStreamTask->id.idStr);
|
||||||
}
|
}
|
||||||
|
|
||||||
// todo check the output queue for fill-history task, and wait for it complete
|
|
||||||
|
|
||||||
|
|
||||||
// 1. expand the query time window for stream task of WAL scanner
|
// 1. expand the query time window for stream task of WAL scanner
|
||||||
pTimeWindow->skey = INT64_MIN;
|
pTimeWindow->skey = INT64_MIN;
|
||||||
qStreamInfoResetTimewindowFilter(pStreamTask->exec.pExecutor);
|
qStreamInfoResetTimewindowFilter(pStreamTask->exec.pExecutor);
|
||||||
|
@ -390,15 +398,10 @@ int32_t streamTransferStateToStreamTask(SStreamTask* pTask) {
|
||||||
int32_t level = pTask->info.taskLevel;
|
int32_t level = pTask->info.taskLevel;
|
||||||
if (level == TASK_LEVEL__SOURCE) {
|
if (level == TASK_LEVEL__SOURCE) {
|
||||||
streamTaskFillHistoryFinished(pTask);
|
streamTaskFillHistoryFinished(pTask);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (level == TASK_LEVEL__AGG || level == TASK_LEVEL__SOURCE) { // do transfer task operator states.
|
||||||
code = streamDoTransferStateToStreamTask(pTask);
|
code = streamDoTransferStateToStreamTask(pTask);
|
||||||
if (code != TSDB_CODE_SUCCESS) { // todo handle this
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
} else if (level == TASK_LEVEL__AGG) { // do transfer task operator states.
|
|
||||||
code = streamDoTransferStateToStreamTask(pTask);
|
|
||||||
if (code != TSDB_CODE_SUCCESS) { // todo handle this
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
|
@ -522,6 +525,7 @@ int32_t streamProcessTranstateBlock(SStreamTask* pTask, SStreamDataBlock* pBlock
|
||||||
ASSERT(pTask->streamTaskId.taskId != 0 && pTask->info.fillHistory == 1);
|
ASSERT(pTask->streamTaskId.taskId != 0 && pTask->info.fillHistory == 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// agg task should dispatch trans-state msg to sink task, to flush all data to sink task.
|
||||||
if (level == TASK_LEVEL__AGG || level == TASK_LEVEL__SOURCE) {
|
if (level == TASK_LEVEL__AGG || level == TASK_LEVEL__SOURCE) {
|
||||||
pBlock->srcVgId = pTask->pMeta->vgId;
|
pBlock->srcVgId = pTask->pMeta->vgId;
|
||||||
code = taosWriteQitem(pTask->outputInfo.queue->queue, pBlock);
|
code = taosWriteQitem(pTask->outputInfo.queue->queue, pBlock);
|
||||||
|
@ -530,16 +534,21 @@ int32_t streamProcessTranstateBlock(SStreamTask* pTask, SStreamDataBlock* pBlock
|
||||||
} else {
|
} else {
|
||||||
streamFreeQitem((SStreamQueueItem*)pBlock);
|
streamFreeQitem((SStreamQueueItem*)pBlock);
|
||||||
}
|
}
|
||||||
|
} else { // level == TASK_LEVEL__SINK
|
||||||
|
streamFreeQitem((SStreamQueueItem*)pBlock);
|
||||||
}
|
}
|
||||||
} else { // non-dispatch task, do task state transfer directly
|
} else { // non-dispatch task, do task state transfer directly
|
||||||
qDebug("s-task:%s non-dispatch task, start to transfer state directly", id);
|
|
||||||
|
|
||||||
streamFreeQitem((SStreamQueueItem*)pBlock);
|
streamFreeQitem((SStreamQueueItem*)pBlock);
|
||||||
ASSERT(pTask->info.fillHistory == 1);
|
if (level != TASK_LEVEL__SINK) {
|
||||||
code = streamTransferStateToStreamTask(pTask);
|
qDebug("s-task:%s non-dispatch task, start to transfer state directly", id);
|
||||||
|
ASSERT(pTask->info.fillHistory == 1);
|
||||||
|
code = streamTransferStateToStreamTask(pTask);
|
||||||
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
|
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
qDebug("s-task:%s sink task does not transfer state", id);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -642,16 +651,6 @@ bool streamTaskIsIdle(const SStreamTask* pTask) {
|
||||||
pTask->status.taskStatus == TASK_STATUS__DROPPING);
|
pTask->status.taskStatus == TASK_STATUS__DROPPING);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamTaskEndScanWAL(SStreamTask* pTask) {
|
|
||||||
const char* id = pTask->id.idStr;
|
|
||||||
double el = (taosGetTimestampMs() - pTask->tsInfo.step2Start) / 1000.0;
|
|
||||||
qDebug("s-task:%s scan-history from WAL stage(step 2) ended, elapsed time:%.2fs", id, el);
|
|
||||||
|
|
||||||
// 1. notify all downstream tasks to transfer executor state after handle all history blocks.
|
|
||||||
appendTranstateIntoInputQ(pTask);
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t streamTryExec(SStreamTask* pTask) {
|
int32_t streamTryExec(SStreamTask* pTask) {
|
||||||
// this function may be executed by multi-threads, so status check is required.
|
// this function may be executed by multi-threads, so status check is required.
|
||||||
int8_t schedStatus =
|
int8_t schedStatus =
|
||||||
|
@ -667,36 +666,14 @@ int32_t streamTryExec(SStreamTask* pTask) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// todo the task should be commit here
|
// todo the task should be commit here
|
||||||
// if (taosQueueEmpty(pTask->inputQueue->queue)) {
|
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
|
||||||
// fill-history WAL scan has completed
|
qDebug("s-task:%s exec completed, status:%s, sched-status:%d", id, streamGetTaskStatusStr(pTask->status.taskStatus),
|
||||||
// if (pTask->status.transferState) {
|
pTask->status.schedStatus);
|
||||||
// code = streamTransferStateToStreamTask(pTask);
|
|
||||||
// if (code != TSDB_CODE_SUCCESS) {
|
|
||||||
// atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
|
|
||||||
// return code;
|
|
||||||
// }
|
|
||||||
|
|
||||||
// the schedStatus == TASK_SCHED_STATUS__ACTIVE, streamSchedExec cannot be executed, so execute once again by
|
if (!(taosQueueEmpty(pTask->inputQueue->queue) || streamTaskShouldStop(&pTask->status) ||
|
||||||
// call this function (streamExecForAll) directly.
|
streamTaskShouldPause(&pTask->status))) {
|
||||||
// code = streamExecForAll(pTask);
|
streamSchedExec(pTask);
|
||||||
// if (code < 0) {
|
}
|
||||||
// do nothing
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
|
|
||||||
// atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
|
|
||||||
// qDebug("s-task:%s exec completed, status:%s, sched-status:%d", id,
|
|
||||||
// streamGetTaskStatusStr(pTask->status.taskStatus), pTask->status.schedStatus);
|
|
||||||
// } else {
|
|
||||||
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
|
|
||||||
qDebug("s-task:%s exec completed, status:%s, sched-status:%d", id, streamGetTaskStatusStr(pTask->status.taskStatus),
|
|
||||||
pTask->status.schedStatus);
|
|
||||||
|
|
||||||
if (!(taosQueueEmpty(pTask->inputQueue->queue) || streamTaskShouldStop(&pTask->status) ||
|
|
||||||
streamTaskShouldPause(&pTask->status))) {
|
|
||||||
streamSchedExec(pTask);
|
|
||||||
}
|
|
||||||
// }
|
|
||||||
} else {
|
} else {
|
||||||
qDebug("s-task:%s already started to exec by other thread, status:%s, sched-status:%d", id,
|
qDebug("s-task:%s already started to exec by other thread, status:%s, sched-status:%d", id,
|
||||||
streamGetTaskStatusStr(pTask->status.taskStatus), pTask->status.schedStatus);
|
streamGetTaskStatusStr(pTask->status.taskStatus), pTask->status.schedStatus);
|
||||||
|
|
|
@ -400,11 +400,6 @@ int32_t appendTranstateIntoInputQ(SStreamTask* pTask) {
|
||||||
}
|
}
|
||||||
|
|
||||||
pTask->status.appendTranstateBlock = true;
|
pTask->status.appendTranstateBlock = true;
|
||||||
|
|
||||||
qDebug("s-task:%s set sched-status:%d, prev:%d", pTask->id.idStr, TASK_SCHED_STATUS__INACTIVE, pTask->status.schedStatus);
|
|
||||||
pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE;
|
|
||||||
streamSchedExec(pTask);
|
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue