fix(stream): fix checkpoint block not fetched bug.
This commit is contained in:
parent
2834c4f56c
commit
08f43b9d00
|
@ -136,7 +136,7 @@ typedef struct {
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int8_t type;
|
int8_t type;
|
||||||
SSDataBlock* pBlock;
|
SSDataBlock* pBlock;
|
||||||
} SStreamTrigger, SStreamCheckpoint;
|
} SStreamTrigger;
|
||||||
|
|
||||||
typedef struct SStreamQueueNode SStreamQueueNode;
|
typedef struct SStreamQueueNode SStreamQueueNode;
|
||||||
|
|
||||||
|
|
|
@ -2189,7 +2189,7 @@ FETCH_NEXT_BLOCK:
|
||||||
qDebug("process %d/%d input data blocks, %s", current, (int32_t) total, id);
|
qDebug("process %d/%d input data blocks, %s", current, (int32_t) total, id);
|
||||||
|
|
||||||
SPackedData* pData = taosArrayGet(pInfo->pBlockLists, current);
|
SPackedData* pData = taosArrayGet(pInfo->pBlockLists, current);
|
||||||
SSDataBlock* pBlock = pData->pDataBlock;
|
SSDataBlock* pBlock = taosArrayGet(pData->pDataBlock, 0);
|
||||||
|
|
||||||
if (pBlock->info.type == STREAM_CHECKPOINT) {
|
if (pBlock->info.type == STREAM_CHECKPOINT) {
|
||||||
streamScanOperatorSaveCheckpoint(pInfo);
|
streamScanOperatorSaveCheckpoint(pInfo);
|
||||||
|
|
|
@ -328,6 +328,8 @@ static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) {
|
||||||
pStreamTask->id.idStr);
|
pStreamTask->id.idStr);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pTask->status.transferState = false; // reset this value, to avoid transfer state again
|
||||||
|
|
||||||
ASSERT(pStreamTask != NULL && pStreamTask->historyTaskId.taskId == pTask->id.taskId);
|
ASSERT(pStreamTask != NULL && pStreamTask->historyTaskId.taskId == pTask->id.taskId);
|
||||||
STimeWindow* pTimeWindow = &pStreamTask->dataRange.window;
|
STimeWindow* pTimeWindow = &pStreamTask->dataRange.window;
|
||||||
|
|
||||||
|
@ -415,8 +417,8 @@ static void doSetStreamInputBlock(SStreamTask* pTask, const void* pInput, int64_
|
||||||
qSetMultiStreamInput(pExecutor, pRefBlock->pBlock, 1, STREAM_INPUT__DATA_BLOCK);
|
qSetMultiStreamInput(pExecutor, pRefBlock->pBlock, 1, STREAM_INPUT__DATA_BLOCK);
|
||||||
|
|
||||||
} else if (pItem->type == STREAM_INPUT__CHECKPOINT || pItem->type == STREAM_INPUT__CHECKPOINT_TRIGGER) {
|
} else if (pItem->type == STREAM_INPUT__CHECKPOINT || pItem->type == STREAM_INPUT__CHECKPOINT_TRIGGER) {
|
||||||
const SStreamCheckpoint* pCheckpoint = (const SStreamCheckpoint*)pInput;
|
const SStreamDataBlock* pCheckpoint = (const SStreamDataBlock*)pInput;
|
||||||
qSetMultiStreamInput(pExecutor, pCheckpoint->pBlock, 1, pItem->type);
|
qSetMultiStreamInput(pExecutor, pCheckpoint->blocks, 1, pItem->type);
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
|
@ -431,16 +433,15 @@ int32_t streamExecForAll(SStreamTask* pTask) {
|
||||||
const char* id = pTask->id.idStr;
|
const char* id = pTask->id.idStr;
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
int32_t batchSize = 0;
|
int32_t numOfBlocks = 0;
|
||||||
SStreamQueueItem* pInput = NULL;
|
SStreamQueueItem* pInput = NULL;
|
||||||
|
|
||||||
// merge multiple input data if possible in the input queue.
|
// merge multiple input data if possible in the input queue.
|
||||||
extractBlocksFromInputQ(pTask, &pInput, &batchSize);
|
extractBlocksFromInputQ(pTask, &pInput, &numOfBlocks);
|
||||||
if (pInput == NULL) {
|
if (pInput == NULL) {
|
||||||
ASSERT(batchSize == 0);
|
ASSERT(numOfBlocks == 0);
|
||||||
if (pTask->info.fillHistory && pTask->status.transferState) {
|
if (pTask->info.fillHistory && pTask->status.transferState) {
|
||||||
int32_t code = streamTransferStateToStreamTask(pTask);
|
int32_t code = streamTransferStateToStreamTask(pTask);
|
||||||
pTask->status.transferState = false; // reset this value, to avoid transfer state again
|
|
||||||
if (code != TSDB_CODE_SUCCESS) { // todo handle this
|
if (code != TSDB_CODE_SUCCESS) { // todo handle this
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -462,7 +463,7 @@ int32_t streamExecForAll(SStreamTask* pTask) {
|
||||||
ASSERT(type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__CHECKPOINT);
|
ASSERT(type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__CHECKPOINT);
|
||||||
|
|
||||||
if (type == STREAM_INPUT__DATA_BLOCK) {
|
if (type == STREAM_INPUT__DATA_BLOCK) {
|
||||||
qDebug("s-task:%s sink task start to sink %d blocks", id, batchSize);
|
qDebug("s-task:%s sink task start to sink %d blocks", id, numOfBlocks);
|
||||||
streamTaskOutputResultBlock(pTask, (SStreamDataBlock*)pInput);
|
streamTaskOutputResultBlock(pTask, (SStreamDataBlock*)pInput);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
@ -471,10 +472,10 @@ int32_t streamExecForAll(SStreamTask* pTask) {
|
||||||
int64_t st = taosGetTimestampMs();
|
int64_t st = taosGetTimestampMs();
|
||||||
|
|
||||||
const SStreamQueueItem* pItem = pInput;
|
const SStreamQueueItem* pItem = pInput;
|
||||||
qDebug("s-task:%s start to process batch of blocks, num:%d, type:%d", id, batchSize, pItem->type);
|
qDebug("s-task:%s start to process batch of blocks, num:%d, type:%d", id, numOfBlocks, pItem->type);
|
||||||
|
|
||||||
int64_t ver = pTask->chkInfo.checkpointVer;
|
int64_t ver = pTask->chkInfo.checkpointVer;
|
||||||
doSetStreamInputBlock(pTask, pInput, &pTask->chkInfo.checkpointVer, id);
|
doSetStreamInputBlock(pTask, pInput, &ver, id);
|
||||||
|
|
||||||
int64_t resSize = 0;
|
int64_t resSize = 0;
|
||||||
int32_t totalBlocks = 0;
|
int32_t totalBlocks = 0;
|
||||||
|
@ -485,11 +486,12 @@ int32_t streamExecForAll(SStreamTask* pTask) {
|
||||||
resSize / 1048576.0, totalBlocks);
|
resSize / 1048576.0, totalBlocks);
|
||||||
|
|
||||||
// update the currentVer if processing the submit blocks.
|
// update the currentVer if processing the submit blocks.
|
||||||
ASSERT(pTask->chkInfo.checkpointVer <= pTask->chkInfo.currentVer && ver <= pTask->chkInfo.checkpointVer);
|
ASSERT(pTask->chkInfo.checkpointVer <= pTask->chkInfo.currentVer && ver >= pTask->chkInfo.checkpointVer);
|
||||||
|
|
||||||
if (ver != pTask->chkInfo.checkpointVer) {
|
if (ver != pTask->chkInfo.checkpointVer) {
|
||||||
qDebug("s-task:%s update checkpointVer(unsaved) from %" PRId64 " to %" PRId64, pTask->id.idStr, ver,
|
qDebug("s-task:%s update checkpointVer(unsaved) from %" PRId64 " to %" PRId64, pTask->id.idStr,
|
||||||
pTask->chkInfo.checkpointVer);
|
pTask->chkInfo.checkpointVer, ver);
|
||||||
|
pTask->chkInfo.checkpointVer = ver;
|
||||||
}
|
}
|
||||||
|
|
||||||
streamFreeQitem(pInput);
|
streamFreeQitem(pInput);
|
||||||
|
|
|
@ -174,7 +174,7 @@ int32_t extractBlocksFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, i
|
||||||
|
|
||||||
if (pTask->info.taskLevel == TASK_LEVEL__SINK) { // extract block from inputQ, one-by-one
|
if (pTask->info.taskLevel == TASK_LEVEL__SINK) { // extract block from inputQ, one-by-one
|
||||||
while (1) {
|
while (1) {
|
||||||
if (streamTaskShouldPause(&pTask->status)) {
|
if (streamTaskShouldPause(&pTask->status) || streamTaskShouldStop(&pTask->status)) {
|
||||||
qDebug("s-task:%s task should pause, extract input blocks:%d", pTask->id.idStr, *numOfBlocks);
|
qDebug("s-task:%s task should pause, extract input blocks:%d", pTask->id.idStr, *numOfBlocks);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
@ -200,7 +200,7 @@ int32_t extractBlocksFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, i
|
||||||
|
|
||||||
// non sink task
|
// non sink task
|
||||||
while (1) {
|
while (1) {
|
||||||
if (streamTaskShouldPause(&pTask->status)) {
|
if (streamTaskShouldPause(&pTask->status) || streamTaskShouldStop(&pTask->status)) {
|
||||||
qDebug("s-task:%s task should pause, extract input blocks:%d", pTask->id.idStr, *numOfBlocks);
|
qDebug("s-task:%s task should pause, extract input blocks:%d", pTask->id.idStr, *numOfBlocks);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue