Merge remote-tracking branch 'origin/enh/triggerCheckPoint2' into enh/triggerCheckPoint2
# Conflicts: # source/libs/stream/src/streamExec.c
This commit is contained in:
commit
a571663a5d
|
@ -16,7 +16,7 @@
|
||||||
#include "streamInt.h"
|
#include "streamInt.h"
|
||||||
|
|
||||||
// maximum allowed processed block batches. One block may include several submit blocks
|
// maximum allowed processed block batches. One block may include several submit blocks
|
||||||
#define MAX_STREAM_RESULT_DUMP_THRESHOLD 100
|
#define MAX_STREAM_RESULT_DUMP_THRESHOLD 100
|
||||||
|
|
||||||
static int32_t updateCheckPointInfo(SStreamTask* pTask, int64_t checkpointId);
|
static int32_t updateCheckPointInfo(SStreamTask* pTask, int64_t checkpointId);
|
||||||
|
|
||||||
|
@ -104,8 +104,8 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, i
|
||||||
taosArrayPush(pRes, &block);
|
taosArrayPush(pRes, &block);
|
||||||
numOfBlocks += 1;
|
numOfBlocks += 1;
|
||||||
|
|
||||||
qDebug("s-task:%s(child %d) retrieve process completed, reqId:0x%" PRIx64" dump results", pTask->id.idStr, pTask->info.selfChildId,
|
qDebug("s-task:%s(child %d) retrieve process completed, reqId:0x%" PRIx64 " dump results", pTask->id.idStr,
|
||||||
pRetrieveBlock->reqId);
|
pTask->info.selfChildId, pRetrieveBlock->reqId);
|
||||||
}
|
}
|
||||||
|
|
||||||
break;
|
break;
|
||||||
|
@ -329,7 +329,8 @@ static void waitForTaskIdle(SStreamTask* pTask, SStreamTask* pStreamTask) {
|
||||||
|
|
||||||
static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) {
|
static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) {
|
||||||
SStreamTask* pStreamTask = streamMetaAcquireTask(pTask->pMeta, pTask->streamTaskId.taskId);
|
SStreamTask* pStreamTask = streamMetaAcquireTask(pTask->pMeta, pTask->streamTaskId.taskId);
|
||||||
qDebug("s-task:%s scan history task end, update stream task:%s info, transfer exec state", pTask->id.idStr, pStreamTask->id.idStr);
|
qDebug("s-task:%s scan history task end, update stream task:%s info, transfer exec state", pTask->id.idStr,
|
||||||
|
pStreamTask->id.idStr);
|
||||||
|
|
||||||
// todo handle stream task is dropped here
|
// todo handle stream task is dropped here
|
||||||
|
|
||||||
|
@ -355,7 +356,7 @@ static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) {
|
||||||
if (pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) {
|
if (pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) {
|
||||||
// update the scan data range for source task.
|
// update the scan data range for source task.
|
||||||
qDebug("s-task:%s level:%d stream task window %" PRId64 " - %" PRId64 " update to %" PRId64 " - %" PRId64
|
qDebug("s-task:%s level:%d stream task window %" PRId64 " - %" PRId64 " update to %" PRId64 " - %" PRId64
|
||||||
", status:%s, sched-status:%d",
|
", status:%s, sched-status:%d",
|
||||||
pStreamTask->id.idStr, TASK_LEVEL__SOURCE, pTimeWindow->skey, pTimeWindow->ekey, INT64_MIN,
|
pStreamTask->id.idStr, TASK_LEVEL__SOURCE, pTimeWindow->skey, pTimeWindow->ekey, INT64_MIN,
|
||||||
pTimeWindow->ekey, streamGetTaskStatusStr(TASK_STATUS__NORMAL), pStreamTask->status.schedStatus);
|
pTimeWindow->ekey, streamGetTaskStatusStr(TASK_STATUS__NORMAL), pStreamTask->status.schedStatus);
|
||||||
} else {
|
} else {
|
||||||
|
@ -385,13 +386,13 @@ int32_t streamExecForAll(SStreamTask* pTask) {
|
||||||
const char* id = pTask->id.idStr;
|
const char* id = pTask->id.idStr;
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
int32_t batchSize = 0;
|
int32_t batchSize = 0;
|
||||||
SStreamQueueItem* pInput = NULL;
|
SStreamQueueItem* pInput = NULL;
|
||||||
|
|
||||||
// merge multiple input data if possible in the input queue.
|
// merge multiple input data if possible in the input queue.
|
||||||
qDebug("s-task:%s start to extract data block from inputQ", id);
|
qDebug("s-task:%s start to extract data block from inputQ", id);
|
||||||
|
|
||||||
/*int32_t code = */extractBlocksFromInputQ(pTask, &pInput, &batchSize, id);
|
/*int32_t code = */ extractBlocksFromInputQ(pTask, &pInput, &batchSize, id);
|
||||||
if (pInput == NULL) {
|
if (pInput == NULL) {
|
||||||
ASSERT(batchSize == 0);
|
ASSERT(batchSize == 0);
|
||||||
if (pTask->info.fillHistory && pTask->status.transferState) {
|
if (pTask->info.fillHistory && pTask->status.transferState) {
|
||||||
|
@ -457,7 +458,7 @@ int32_t streamExecForAll(SStreamTask* pTask) {
|
||||||
const SStreamRefDataBlock* pRefBlock = (const SStreamRefDataBlock*)pInput;
|
const SStreamRefDataBlock* pRefBlock = (const SStreamRefDataBlock*)pInput;
|
||||||
qSetMultiStreamInput(pExecutor, pRefBlock->pBlock, 1, STREAM_INPUT__DATA_BLOCK);
|
qSetMultiStreamInput(pExecutor, pRefBlock->pBlock, 1, STREAM_INPUT__DATA_BLOCK);
|
||||||
} else if (pItem->type == STREAM_INPUT__CHECKPOINT) {
|
} else if (pItem->type == STREAM_INPUT__CHECKPOINT) {
|
||||||
const SStreamCheckpoint* pCheckpoint = (const SStreamCheckpoint*) pInput;
|
const SStreamCheckpoint* pCheckpoint = (const SStreamCheckpoint*)pInput;
|
||||||
qSetMultiStreamInput(pExecutor, pCheckpoint->pBlock, 1, STREAM_INPUT__CHECKPOINT);
|
qSetMultiStreamInput(pExecutor, pCheckpoint->pBlock, 1, STREAM_INPUT__CHECKPOINT);
|
||||||
} else {
|
} else {
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
|
@ -468,9 +469,9 @@ int32_t streamExecForAll(SStreamTask* pTask) {
|
||||||
int32_t totalBlocks = 0;
|
int32_t totalBlocks = 0;
|
||||||
streamTaskExecImpl(pTask, pInput, &resSize, &totalBlocks);
|
streamTaskExecImpl(pTask, pInput, &resSize, &totalBlocks);
|
||||||
|
|
||||||
double el = (taosGetTimestampMs() - st) / 1000.0;
|
double el = (taosGetTimestampMs() - st) / 1000.0;
|
||||||
qDebug("s-task:%s batch of input blocks exec end, elapsed time:%.2fs, result size:%.2fMiB, numOfBlocks:%d",
|
qDebug("s-task:%s batch of input blocks exec end, elapsed time:%.2fs, result size:%.2fMiB, numOfBlocks:%d", id, el,
|
||||||
id, el, resSize / 1048576.0, totalBlocks);
|
resSize / 1048576.0, totalBlocks);
|
||||||
|
|
||||||
// update the currentVer if processing the submit blocks.
|
// update the currentVer if processing the submit blocks.
|
||||||
if(currentVer > pTask->chkInfo.currentVer) {
|
if(currentVer > pTask->chkInfo.currentVer) {
|
||||||
|
@ -488,7 +489,6 @@ int32_t streamExecForAll(SStreamTask* pTask) {
|
||||||
pTask->status.taskStatus = TASK_STATUS__CK_READY;
|
pTask->status.taskStatus = TASK_STATUS__CK_READY;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -525,19 +525,20 @@ int32_t streamTryExec(SStreamTask* pTask) {
|
||||||
|
|
||||||
// todo the task should be commit here
|
// todo the task should be commit here
|
||||||
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
|
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
|
||||||
qDebug("s-task:%s exec completed, status:%s, sched-status:%d", pTask->id.idStr, streamGetTaskStatusStr(pTask->status.taskStatus),
|
qDebug("s-task:%s exec completed, status:%s, sched-status:%d", pTask->id.idStr,
|
||||||
pTask->status.schedStatus);
|
streamGetTaskStatusStr(pTask->status.taskStatus), pTask->status.schedStatus);
|
||||||
|
|
||||||
if (pTask->status.taskStatus == TASK_STATUS__CK_READY) {
|
if (pTask->status.taskStatus == TASK_STATUS__CK_READY) {
|
||||||
// check for all tasks, and do generate the vnode-wide checkpoint data.
|
// check for all tasks, and do generate the vnode-wide checkpoint data.
|
||||||
// todo extract method
|
// todo extract method
|
||||||
SStreamMeta* pMeta = pTask->pMeta;
|
SStreamMeta* pMeta = pTask->pMeta;
|
||||||
int32_t remain = atomic_sub_fetch_32(&pMeta->chkptNotReadyTasks, 1);
|
int32_t remain = atomic_sub_fetch_32(&pMeta->chkptNotReadyTasks, 1);
|
||||||
ASSERT(remain >= 0);
|
ASSERT(remain >= 0);
|
||||||
|
|
||||||
if (remain == 0) { // all tasks are in TASK_STATUS__CK_READY state
|
if (remain == 0) { // all tasks are in TASK_STATUS__CK_READY state
|
||||||
streamBackendDoCheckpoint(pMeta, pTask->checkpointingId);
|
streamBackendDoCheckpoint(pMeta, pTask->checkpointingId);
|
||||||
qDebug("vgId:%d do vnode wide checkpoint completed, checkpointId:%"PRId64, pMeta->vgId);
|
qDebug("vgId:%d do vnode wide checkpoint completed, checkpoint id:%" PRId64 "", pMeta->vgId,
|
||||||
|
pTask->checkpointingId);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pTask->info.taskLevel != TASK_LEVEL__SINK) {
|
if (pTask->info.taskLevel != TASK_LEVEL__SINK) {
|
||||||
|
|
Loading…
Reference in New Issue