From e41da13d5bdde89f84299281d8c9dfe6f7022e4a Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Sat, 7 Oct 2023 17:07:26 +0800 Subject: [PATCH] refact task backend --- source/libs/stream/src/streamBackendRocksdb.c | 63 +------------------ source/libs/stream/src/streamCheckpoint.c | 24 +++---- 2 files changed, 14 insertions(+), 73 deletions(-) diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 07249862fb..97d6f0634a 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -1053,68 +1053,7 @@ _EXIT: taosReleaseRef(taskBackendWrapperId, refId); return -1; } -int32_t streamBackendDoCheckpoint(void* arg, uint64_t checkpointId) { - return 0; - // SStreamMeta* pMeta = arg; - // int64_t backendRid = pMeta->streamBackendRid; - // int64_t st = taosGetTimestampMs(); - // int32_t code = -1; - - // SArray* refs = taosArrayInit(16, sizeof(int64_t)); - - // rocksdb_column_family_handle_t** ppCf = NULL; - - // char* pChkpDir = NULL; - // char* pChkpIdDir = NULL; - // if (chkpPreCheckDir(pMeta->path, checkpointId, &pChkpDir, &pChkpIdDir) != 0) { - // taosArrayDestroy(refs); - // return code; - // } - - // SBackendWrapper* pHandle = taosAcquireRef(streamBackendId, backendRid); - // if (pHandle == NULL || pHandle->db == NULL) { - // goto _ERROR; - // } - - // // Get all cf and acquire cfWrappter - // int32_t nCf = chkpGetAllDbCfHandle(pMeta, &ppCf, refs); - // qDebug("stream backend:%p start to do checkpoint at:%s, cf num: %d ", pHandle, pChkpIdDir, nCf); - - // code = chkpPreFlushDb(pHandle->db, ppCf, nCf); - // if (code == 0) { - // code = chkpDoDbCheckpoint(pHandle->db, pChkpIdDir); - // if (code != 0) { - // qError("stream backend:%p failed to do checkpoint at:%s", pHandle, pChkpIdDir); - // } else { - // qDebug("stream backend:%p end to do checkpoint at:%s, time cost:%" PRId64 "ms", pHandle, pChkpIdDir, - // taosGetTimestampMs() - st); - // } - // } else { - // qError("stream backend:%p failed to flush db at:%s", pHandle, pChkpIdDir); - // } - // // release all ref to cfWrapper; - // for (int i = 0; i < taosArrayGetSize(refs); i++) { - // int64_t id = *(int64_t*)taosArrayGet(refs, i); - // taosReleaseRef(streamBackendCfWrapperId, id); - // } - // if (code == 0) { - // taosWLockLatch(&pMeta->chkpDirLock); - // taosArrayPush(pMeta->chkpSaved, &checkpointId); - // taosWUnLockLatch(&pMeta->chkpDirLock); - - // // delete obsolte checkpoint - // delObsoleteCheckpoint(arg, pChkpDir); - // pMeta->chkpId = checkpointId; - // } - - // _ERROR: - // taosReleaseRef(streamBackendId, backendRid); - // taosArrayDestroy(refs); - // taosMemoryFree(ppCf); - // taosMemoryFree(pChkpDir); - // taosMemoryFree(pChkpIdDir); - // return code; -} +int32_t streamBackendDoCheckpoint(void* arg, uint64_t chkpId) { return taskBackendDoCheckpoint(arg, chkpId); } SListNode* streamBackendAddCompare(void* backend, void* arg) { SBackendWrapper* pHandle = (SBackendWrapper*)backend; diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index cc93d25fd5..63e3b94561 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -120,7 +120,7 @@ static int32_t appendCheckpointIntoInputQ(SStreamTask* pTask, int32_t checkpoint pBlock->info.rows = 1; pBlock->info.childId = pTask->info.selfChildId; - pChkpoint->blocks = taosArrayInit(4, sizeof(SSDataBlock));//pBlock; + pChkpoint->blocks = taosArrayInit(4, sizeof(SSDataBlock)); // pBlock; taosArrayPush(pChkpoint->blocks, pBlock); taosMemoryFree(pBlock); @@ -166,10 +166,10 @@ static int32_t continueDispatchCheckpointBlock(SStreamDataBlock* pBlock, SStream int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBlock) { SSDataBlock* pDataBlock = taosArrayGet(pBlock->blocks, 0); - int64_t checkpointId = pDataBlock->info.version; + int64_t checkpointId = pDataBlock->info.version; const char* id = pTask->id.idStr; - int32_t code = TSDB_CODE_SUCCESS; + int32_t code = TSDB_CODE_SUCCESS; // set the task status pTask->checkpointingId = checkpointId; @@ -177,7 +177,7 @@ int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBloc // set task status pTask->status.taskStatus = TASK_STATUS__CK; - { // todo: remove this when the pipeline checkpoint generating is used. + { // todo: remove this when the pipeline checkpoint generating is used. SStreamMeta* pMeta = pTask->pMeta; taosWLockLatch(&pMeta->lock); @@ -189,10 +189,11 @@ int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBloc taosWUnLockLatch(&pMeta->lock); } - //todo fix race condition: set the status and append checkpoint block + // todo fix race condition: set the status and append checkpoint block int32_t taskLevel = pTask->info.taskLevel; if (taskLevel == TASK_LEVEL__SOURCE) { - if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { + if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH || + pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { qDebug("s-task:%s set childIdx:%d, and add checkpoint block into outputQ", id, pTask->info.selfChildId); continueDispatchCheckpointBlock(pBlock, pTask); } else { // only one task exists, no need to dispatch downstream info @@ -223,7 +224,8 @@ int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBloc } else { qDebug( "s-task:%s process checkpoint block, all %d upstreams sent checkpoint msgs, dispatch checkpoint msg " - "downstream", id, num); + "downstream", + id, num); // set the needed checked downstream tasks, only when all downstream tasks do checkpoint complete, this task // can start local checkpoint procedure @@ -288,11 +290,11 @@ int32_t streamSaveAllTaskStatus(SStreamMeta* pMeta, int64_t checkpointId) { // save the task streamMetaSaveTask(pMeta, p); - streamTaskOpenAllUpstreamInput(p); // open inputQ for all upstream tasks + streamTaskOpenAllUpstreamInput(p); // open inputQ for all upstream tasks qDebug("vgId:%d s-task:%s level:%d commit task status after checkpoint completed, checkpointId:%" PRId64 ", Ver(saved):%" PRId64 " currentVer:%" PRId64 ", status to be normal, prev:%s", - pMeta->vgId, p->id.idStr, p->info.taskLevel, checkpointId, p->chkInfo.checkpointVer, p->chkInfo.nextProcessVer, - streamGetTaskStatusStr(prev)); + pMeta->vgId, p->id.idStr, p->info.taskLevel, checkpointId, p->chkInfo.checkpointVer, + p->chkInfo.nextProcessVer, streamGetTaskStatusStr(prev)); } if (streamMetaCommit(pMeta) < 0) { @@ -320,7 +322,7 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) { qDebug("s-task:%s is ready for checkpoint", pTask->id.idStr); pMeta->totalTasks = 0; - streamBackendDoCheckpoint(pMeta, pTask->checkpointingId); + streamBackendDoCheckpoint(pTask->pBackend, pTask->checkpointingId); streamSaveAllTaskStatus(pMeta, pTask->checkpointingId); qDebug("vgId:%d vnode wide checkpoint completed, save all tasks status, checkpointId:%" PRId64, pMeta->vgId, pTask->checkpointingId);