fix(stream): fix error.
This commit is contained in:
parent
a89ce1a20b
commit
ce721a0146
|
@ -272,6 +272,7 @@ typedef struct SStreamStatus {
|
||||||
int8_t schedStatus;
|
int8_t schedStatus;
|
||||||
int8_t keepTaskStatus;
|
int8_t keepTaskStatus;
|
||||||
bool transferState;
|
bool transferState;
|
||||||
|
bool appendTranstateBlock; // has append the transfer state data block already
|
||||||
int8_t timerActive; // timer is active
|
int8_t timerActive; // timer is active
|
||||||
int8_t pauseAllowed; // allowed task status to be set to be paused
|
int8_t pauseAllowed; // allowed task status to be set to be paused
|
||||||
} SStreamStatus;
|
} SStreamStatus;
|
||||||
|
|
|
@ -214,9 +214,11 @@ static void checkForFillHistoryVerRange(SStreamTask* pTask, int64_t ver) {
|
||||||
qWarn("s-task:%s fill-history scan WAL, currentVer:%" PRId64 " reach the maximum ver:%" PRId64
|
qWarn("s-task:%s fill-history scan WAL, currentVer:%" PRId64 " reach the maximum ver:%" PRId64
|
||||||
", not scan wal anymore, set the transfer state flag",
|
", not scan wal anymore, set the transfer state flag",
|
||||||
pTask->id.idStr, ver, pTask->dataRange.range.maxVer);
|
pTask->id.idStr, ver, pTask->dataRange.range.maxVer);
|
||||||
pTask->status.transferState = true;
|
if (!pTask->status.appendTranstateBlock) {
|
||||||
|
pTask->status.appendTranstateBlock = true;
|
||||||
/*int32_t code = */streamSchedExec(pTask);
|
appendTranstateIntoInputQ(pTask);
|
||||||
|
/*int32_t code = */streamSchedExec(pTask);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -385,8 +385,9 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) {
|
||||||
destroyStreamDataBlock((SStreamDataBlock*) pItem);
|
destroyStreamDataBlock((SStreamDataBlock*) pItem);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
} else if (type == STREAM_INPUT__CHECKPOINT) {
|
} else if (type == STREAM_INPUT__CHECKPOINT || type == STREAM_INPUT__TRANS_STATE) {
|
||||||
taosWriteQitem(pTask->inputQueue->queue, pItem);
|
taosWriteQitem(pTask->inputQueue->queue, pItem);
|
||||||
|
qDebug("s-task:%s trans-state blockdata enqueue, total in queue:%d, size:%.2fMiB", pTask->id.idStr, total, size);
|
||||||
} else if (type == STREAM_INPUT__GET_RES) {
|
} else if (type == STREAM_INPUT__GET_RES) {
|
||||||
// use the default memory limit, refactor later.
|
// use the default memory limit, refactor later.
|
||||||
taosWriteQitem(pTask->inputQueue->queue, pItem);
|
taosWriteQitem(pTask->inputQueue->queue, pItem);
|
||||||
|
|
|
@ -391,11 +391,11 @@ static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) {
|
||||||
streamTaskFillHistoryFinished(pTask);
|
streamTaskFillHistoryFinished(pTask);
|
||||||
streamTaskEndScanWAL(pTask);
|
streamTaskEndScanWAL(pTask);
|
||||||
|
|
||||||
code = streamDoTransferStateToStreamTask(pTask);
|
code = streamDoTransferStateToStreamTask(pTask);
|
||||||
if (code != TSDB_CODE_SUCCESS) { // todo handle this
|
if (code != TSDB_CODE_SUCCESS) { // todo handle this
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
} else if (level == TASK_LEVEL__AGG) { // do transfer task operator states.
|
} else if (level == TASK_LEVEL__AGG) { // do transfer task operator states.
|
||||||
code = streamDoTransferStateToStreamTask(pTask);
|
code = streamDoTransferStateToStreamTask(pTask);
|
||||||
if (code != TSDB_CODE_SUCCESS) { // todo handle this
|
if (code != TSDB_CODE_SUCCESS) { // todo handle this
|
||||||
return code;
|
return code;
|
||||||
|
@ -484,9 +484,10 @@ int32_t streamProcessTranstateBlock(SStreamTask* pTask, SStreamDataBlock* pBlock
|
||||||
pTask->status.transferState = true;
|
pTask->status.transferState = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
// dispatch the transtate block to downstream task immediately
|
// dispatch the tran-state block to downstream task immediately
|
||||||
if (level == TASK_LEVEL__SOURCE || level == TASK_LEVEL__AGG) {
|
int32_t type = pTask->outputInfo.type;
|
||||||
// pBlock-> = pTask->id.taskId;
|
if ((level == TASK_LEVEL__AGG || level == TASK_LEVEL__SOURCE) &&
|
||||||
|
(type == TASK_OUTPUT__FIXED_DISPATCH || type == TASK_OUTPUT__SHUFFLE_DISPATCH)) {
|
||||||
pBlock->srcVgId = pTask->pMeta->vgId;
|
pBlock->srcVgId = pTask->pMeta->vgId;
|
||||||
code = taosWriteQitem(pTask->outputInfo.queue->queue, pBlock);
|
code = taosWriteQitem(pTask->outputInfo.queue->queue, pBlock);
|
||||||
if (code == 0) {
|
if (code == 0) {
|
||||||
|
@ -640,10 +641,10 @@ int32_t streamTryExec(SStreamTask* pTask) {
|
||||||
|
|
||||||
// the schedStatus == TASK_SCHED_STATUS__ACTIVE, streamSchedExec cannot be executed, so execute once again by
|
// the schedStatus == TASK_SCHED_STATUS__ACTIVE, streamSchedExec cannot be executed, so execute once again by
|
||||||
// call this function (streamExecForAll) directly.
|
// call this function (streamExecForAll) directly.
|
||||||
code = streamExecForAll(pTask);
|
// code = streamExecForAll(pTask);
|
||||||
if (code < 0) {
|
// if (code < 0) {
|
||||||
// do nothing
|
// do nothing
|
||||||
}
|
// }
|
||||||
}
|
}
|
||||||
|
|
||||||
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
|
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
|
||||||
|
|
|
@ -427,6 +427,8 @@ int32_t appendTranstateIntoInputQ(SStreamTask* pTask) {
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pTranstate->type = STREAM_INPUT__TRANS_STATE;
|
||||||
|
|
||||||
pBlock->info.type = STREAM_TRANS_STATE;
|
pBlock->info.type = STREAM_TRANS_STATE;
|
||||||
pBlock->info.rows = 1;
|
pBlock->info.rows = 1;
|
||||||
pBlock->info.childId = pTask->info.selfChildId;
|
pBlock->info.childId = pTask->info.selfChildId;
|
||||||
|
@ -440,7 +442,10 @@ int32_t appendTranstateIntoInputQ(SStreamTask* pTask) {
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
qDebug("s-task:%s set sched-status:%d, prev:%d", pTask->id.idStr, TASK_SCHED_STATUS__INACTIVE, pTask->status.schedStatus);
|
||||||
|
pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE;
|
||||||
streamSchedExec(pTask);
|
streamSchedExec(pTask);
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue