refactor code

This commit is contained in:
yihaoDeng 2023-05-15 05:29:56 +00:00
parent 9c0d2c87c6
commit 42562d388f
2 changed files with 11 additions and 9 deletions

View File

@ -20,12 +20,12 @@
#define MIN_STREAM_EXEC_BATCH_NUM 16 #define MIN_STREAM_EXEC_BATCH_NUM 16
bool streamTaskShouldStop(const SStreamStatus* pStatus) { bool streamTaskShouldStop(const SStreamStatus* pStatus) {
int32_t status = atomic_load_8((int8_t*) &pStatus->taskStatus); int32_t status = atomic_load_8((int8_t*)&pStatus->taskStatus);
return (status == TASK_STATUS__STOP) || (status == TASK_STATUS__DROPPING); return (status == TASK_STATUS__STOP) || (status == TASK_STATUS__DROPPING);
} }
bool streamTaskShouldPause(const SStreamStatus* pStatus) { bool streamTaskShouldPause(const SStreamStatus* pStatus) {
int32_t status = atomic_load_8((int8_t*) &pStatus->taskStatus); int32_t status = atomic_load_8((int8_t*)&pStatus->taskStatus);
return (status == TASK_STATUS__PAUSE); return (status == TASK_STATUS__PAUSE);
} }
@ -53,14 +53,14 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray*
ASSERT(pTask->taskLevel == TASK_LEVEL__SOURCE); ASSERT(pTask->taskLevel == TASK_LEVEL__SOURCE);
const SStreamDataSubmit2* pSubmit = (const SStreamDataSubmit2*)data; const SStreamDataSubmit2* pSubmit = (const SStreamDataSubmit2*)data;
qSetMultiStreamInput(pExecutor, &pSubmit->submit, 1, STREAM_INPUT__DATA_SUBMIT); qSetMultiStreamInput(pExecutor, &pSubmit->submit, 1, STREAM_INPUT__DATA_SUBMIT);
qDebug("s-task:%s set submit blocks as source block completed, %p %p len:%d ver:%" PRId64, pTask->id.idStr, pSubmit, pSubmit->submit.msgStr, qDebug("s-task:%s set submit blocks as source block completed, %p %p len:%d ver:%" PRId64, pTask->id.idStr, pSubmit,
pSubmit->submit.msgLen, pSubmit->submit.ver); pSubmit->submit.msgStr, pSubmit->submit.msgLen, pSubmit->submit.ver);
} else if (pItem->type == STREAM_INPUT__DATA_BLOCK || pItem->type == STREAM_INPUT__DATA_RETRIEVE) { } else if (pItem->type == STREAM_INPUT__DATA_BLOCK || pItem->type == STREAM_INPUT__DATA_RETRIEVE) {
const SStreamDataBlock* pBlock = (const SStreamDataBlock*)data; const SStreamDataBlock* pBlock = (const SStreamDataBlock*)data;
SArray* pBlockList = pBlock->blocks; SArray* pBlockList = pBlock->blocks;
int32_t numOfBlocks = taosArrayGetSize(pBlockList); int32_t numOfBlocks = taosArrayGetSize(pBlockList);
qDebug("s-task:%s set sdata blocks as input num:%d, ver:%"PRId64, pTask->id.idStr, numOfBlocks, pBlock->sourceVer); qDebug("s-task:%s set sdata blocks as input num:%d, ver:%" PRId64, pTask->id.idStr, numOfBlocks, pBlock->sourceVer);
qSetMultiStreamInput(pExecutor, pBlockList->pData, numOfBlocks, STREAM_INPUT__DATA_BLOCK); qSetMultiStreamInput(pExecutor, pBlockList->pData, numOfBlocks, STREAM_INPUT__DATA_BLOCK);
} else if (pItem->type == STREAM_INPUT__MERGED_SUBMIT) { } else if (pItem->type == STREAM_INPUT__MERGED_SUBMIT) {
const SStreamMergedSubmit2* pMerged = (const SStreamMergedSubmit2*)data; const SStreamMergedSubmit2* pMerged = (const SStreamMergedSubmit2*)data;
@ -202,7 +202,7 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) {
qRes->blocks = pRes; qRes->blocks = pRes;
code = streamTaskOutput(pTask, qRes); code = streamTaskOutput(pTask, qRes);
if (code == TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY) { if (code == TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY) {
taosFreeQitem(pRes); taosFreeQitem(qRes);
return code; return code;
} }
@ -332,12 +332,12 @@ int32_t streamExecForAll(SStreamTask* pTask) {
int64_t ckId = 0; int64_t ckId = 0;
int64_t dataVer = 0; int64_t dataVer = 0;
qGetCheckpointVersion(pTask->exec.pExecutor, &dataVer, &ckId); qGetCheckpointVersion(pTask->exec.pExecutor, &dataVer, &ckId);
if (ckId > pTask->chkInfo.id) { // save it since the checkpoint is updated if (ckId > pTask->chkInfo.id) { // save it since the checkpoint is updated
qDebug("s-task:%s exec end, start to update check point, ver from %" PRId64 " to %" PRId64 qDebug("s-task:%s exec end, start to update check point, ver from %" PRId64 " to %" PRId64
", checkPoint id:%" PRId64 " -> %" PRId64, ", checkPoint id:%" PRId64 " -> %" PRId64,
pTask->id.idStr, pTask->chkInfo.version, dataVer, pTask->chkInfo.id, ckId); pTask->id.idStr, pTask->chkInfo.version, dataVer, pTask->chkInfo.id, ckId);
pTask->chkInfo = (SCheckpointInfo) {.version = dataVer, .id = ckId, .currentVer = pTask->chkInfo.currentVer}; pTask->chkInfo = (SCheckpointInfo){.version = dataVer, .id = ckId, .currentVer = pTask->chkInfo.currentVer};
taosWLockLatch(&pTask->pMeta->lock); taosWLockLatch(&pTask->pMeta->lock);

View File

@ -439,7 +439,9 @@ int32_t deleteExpiredCheckPoint(SStreamFileState* pFileState, TSKEY mark) {
int32_t recoverSnapshot(SStreamFileState* pFileState) { int32_t recoverSnapshot(SStreamFileState* pFileState) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
deleteExpiredCheckPoint(pFileState, pFileState->maxTs - pFileState->deleteMark); int64_t mark = (INT64_MIN + pFileState->deleteMark >= pFileState->maxTs) ? INT64_MIN
: pFileState->maxTs - pFileState->deleteMark;
deleteExpiredCheckPoint(pFileState, mark);
void* pStVal = NULL; void* pStVal = NULL;
int32_t len = 0; int32_t len = 0;