refactor: do some internal refactor.

This commit is contained in:
Haojun Liao 2023-07-11 11:41:37 +08:00
parent 62a763ead3
commit 571cccd737
1 changed files with 63 additions and 49 deletions

View File

@ -378,6 +378,56 @@ static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) {
return TSDB_CODE_SUCCESS;
}
// set input
static void doSetStreamInputBlock(SStreamTask* pTask, const void* pInput, int64_t* pCurrentVer, const char* id) {
void* pExecutor = pTask->exec.pExecutor;
const SStreamQueueItem* pItem = pInput;
if (pItem->type == STREAM_INPUT__GET_RES) {
const SStreamTrigger* pTrigger = (const SStreamTrigger*)pInput;
qSetMultiStreamInput(pExecutor, pTrigger->pBlock, 1, STREAM_INPUT__DATA_BLOCK);
} else if (pItem->type == STREAM_INPUT__DATA_SUBMIT) {
ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE);
const SStreamDataSubmit* pSubmit = (const SStreamDataSubmit*)pInput;
qSetMultiStreamInput(pExecutor, &pSubmit->submit, 1, STREAM_INPUT__DATA_SUBMIT);
qDebug("s-task:%s set submit blocks as source block completed, %p %p len:%d ver:%" PRId64, id, pSubmit,
pSubmit->submit.msgStr, pSubmit->submit.msgLen, pSubmit->submit.ver);
ASSERT((*pCurrentVer) < pSubmit->submit.ver);
(*pCurrentVer) = pSubmit->submit.ver;
} else if (pItem->type == STREAM_INPUT__DATA_BLOCK || pItem->type == STREAM_INPUT__DATA_RETRIEVE) {
const SStreamDataBlock* pBlock = (const SStreamDataBlock*)pInput;
SArray* pBlockList = pBlock->blocks;
int32_t numOfBlocks = taosArrayGetSize(pBlockList);
qDebug("s-task:%s set sdata blocks as input num:%d, ver:%" PRId64, id, numOfBlocks, pBlock->sourceVer);
qSetMultiStreamInput(pExecutor, pBlockList->pData, numOfBlocks, STREAM_INPUT__DATA_BLOCK);
} else if (pItem->type == STREAM_INPUT__MERGED_SUBMIT) {
const SStreamMergedSubmit* pMerged = (const SStreamMergedSubmit*)pInput;
SArray* pBlockList = pMerged->submits;
int32_t numOfBlocks = taosArrayGetSize(pBlockList);
qDebug("s-task:%s %p set (merged) submit blocks as a batch, numOfBlocks:%d, ver:%" PRId64, id, pTask,
numOfBlocks, pMerged->ver);
qSetMultiStreamInput(pExecutor, pBlockList->pData, numOfBlocks, STREAM_INPUT__MERGED_SUBMIT);
ASSERT((*pCurrentVer) < pMerged->ver);
(*pCurrentVer) = pMerged->ver;
} else if (pItem->type == STREAM_INPUT__REF_DATA_BLOCK) {
const SStreamRefDataBlock* pRefBlock = (const SStreamRefDataBlock*)pInput;
qSetMultiStreamInput(pExecutor, pRefBlock->pBlock, 1, STREAM_INPUT__DATA_BLOCK);
} else if (pItem->type == STREAM_INPUT__CHECKPOINT) {
const SStreamCheckpoint* pCheckpoint = (const SStreamCheckpoint*)pInput;
qSetMultiStreamInput(pExecutor, pCheckpoint->pBlock, 1, STREAM_INPUT__CHECKPOINT);
} else {
ASSERT(0);
}
}
/**
* todo: the batch of blocks should be tuned dynamic, according to the total elapsed time of each batch of blocks, the
* appropriate batch of blocks should be handled in 5 to 10 sec.
@ -418,52 +468,12 @@ int32_t streamExecForAll(SStreamTask* pTask) {
}
int64_t st = taosGetTimestampMs();
qDebug("s-task:%s start to process batch of blocks, num:%d", id, batchSize);
const SStreamQueueItem* pItem = pInput;
qDebug("s-task:%s start to process batch of blocks, num:%d, type:%d", id, batchSize, pItem->type);
int64_t currentVer = pTask->chkInfo.currentVer;
{
// set input
void* pExecutor = pTask->exec.pExecutor;
const SStreamQueueItem* pItem = pInput;
if (pItem->type == STREAM_INPUT__GET_RES) {
const SStreamTrigger* pTrigger = (const SStreamTrigger*)pInput;
qSetMultiStreamInput(pExecutor, pTrigger->pBlock, 1, STREAM_INPUT__DATA_BLOCK);
} else if (pItem->type == STREAM_INPUT__DATA_SUBMIT) {
ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE);
const SStreamDataSubmit* pSubmit = (const SStreamDataSubmit*)pInput;
qSetMultiStreamInput(pExecutor, &pSubmit->submit, 1, STREAM_INPUT__DATA_SUBMIT);
qDebug("s-task:%s set submit blocks as source block completed, %p %p len:%d ver:%" PRId64, id, pSubmit,
pSubmit->submit.msgStr, pSubmit->submit.msgLen, pSubmit->submit.ver);
ASSERT(currentVer < pSubmit->submit.ver);
currentVer = pSubmit->submit.ver;
} else if (pItem->type == STREAM_INPUT__DATA_BLOCK || pItem->type == STREAM_INPUT__DATA_RETRIEVE) {
const SStreamDataBlock* pBlock = (const SStreamDataBlock*)pInput;
SArray* pBlockList = pBlock->blocks;
int32_t numOfBlocks = taosArrayGetSize(pBlockList);
qDebug("s-task:%s set sdata blocks as input num:%d, ver:%" PRId64, id, numOfBlocks, pBlock->sourceVer);
qSetMultiStreamInput(pExecutor, pBlockList->pData, numOfBlocks, STREAM_INPUT__DATA_BLOCK);
} else if (pItem->type == STREAM_INPUT__MERGED_SUBMIT) {
const SStreamMergedSubmit* pMerged = (const SStreamMergedSubmit*)pInput;
SArray* pBlockList = pMerged->submits;
int32_t numOfBlocks = taosArrayGetSize(pBlockList);
qDebug("s-task:%s %p set (merged) submit blocks as a batch, numOfBlocks:%d, ver:%" PRId64, id, pTask,
numOfBlocks, pMerged->ver);
qSetMultiStreamInput(pExecutor, pBlockList->pData, numOfBlocks, STREAM_INPUT__MERGED_SUBMIT);
ASSERT(currentVer < pMerged->ver);
currentVer = pMerged->ver;
} else if (pItem->type == STREAM_INPUT__REF_DATA_BLOCK) {
const SStreamRefDataBlock* pRefBlock = (const SStreamRefDataBlock*)pInput;
qSetMultiStreamInput(pExecutor, pRefBlock->pBlock, 1, STREAM_INPUT__DATA_BLOCK);
} else if (pItem->type == STREAM_INPUT__CHECKPOINT) {
const SStreamCheckpoint* pCheckpoint = (const SStreamCheckpoint*)pInput;
qSetMultiStreamInput(pExecutor, pCheckpoint->pBlock, 1, STREAM_INPUT__CHECKPOINT);
} else {
ASSERT(0);
}
}
doSetStreamInputBlock(pTask, pInput, &currentVer, id);
int64_t resSize = 0;
int32_t totalBlocks = 0;
@ -523,7 +533,6 @@ int32_t streamTryExec(SStreamTask* pTask) {
return -1;
}
// todo the task should be commit here
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
qDebug("s-task:%s exec completed, status:%s, sched-status:%d", pTask->id.idStr,
streamGetTaskStatusStr(pTask->status.taskStatus), pTask->status.schedStatus);
@ -551,17 +560,22 @@ int32_t streamTryExec(SStreamTask* pTask) {
if (code == TSDB_CODE_SUCCESS) {
taosWLockLatch(&pTask->pMeta->lock);
ASSERT(pTask->chkInfo.keptCheckpointId < pTask->checkpointingId);
pTask->chkInfo.keptCheckpointId = pTask->checkpointingId;
streamMetaSaveTask(pTask->pMeta, pTask);
if (streamMetaCommit(pTask->pMeta) < 0) {
taosWUnLockLatch(&pTask->pMeta->lock);
qError("s-task:%s failed to commit stream meta, since %s", pTask->id.idStr, terrstr());
qError("s-task:%s failed to commit stream meta after do checkpoint, checkpointId:%" PRId64 ", ver:%" PRId64
", since %s",
pTask->id.idStr, pTask->chkInfo.keptCheckpointId, pTask->chkInfo.version, terrstr());
return -1;
} else {
taosWUnLockLatch(&pTask->pMeta->lock);
qDebug("s-task:%s commit after checkpoint generating", pTask->id.idStr);
}
qInfo("vgId:%d s-task:%s commit task status after checkpoint completed", pMeta->vgId, pTask->id.idStr);
qInfo("vgId:%d s-task:%s commit task status after checkpoint completed, checkpointId:%" PRId64 ", ver:%" PRId64,
pMeta->vgId, pTask->id.idStr, pTask->chkInfo.keptCheckpointId, pTask->chkInfo.version);
} else {
// todo: let's retry send rsp to upstream/mnode
}