refact task backend
This commit is contained in:
parent
3ebc7eef5f
commit
e41da13d5b
|
@ -1053,68 +1053,7 @@ _EXIT:
|
||||||
taosReleaseRef(taskBackendWrapperId, refId);
|
taosReleaseRef(taskBackendWrapperId, refId);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
int32_t streamBackendDoCheckpoint(void* arg, uint64_t checkpointId) {
|
int32_t streamBackendDoCheckpoint(void* arg, uint64_t chkpId) { return taskBackendDoCheckpoint(arg, chkpId); }
|
||||||
return 0;
|
|
||||||
// SStreamMeta* pMeta = arg;
|
|
||||||
// int64_t backendRid = pMeta->streamBackendRid;
|
|
||||||
// int64_t st = taosGetTimestampMs();
|
|
||||||
// int32_t code = -1;
|
|
||||||
|
|
||||||
// SArray* refs = taosArrayInit(16, sizeof(int64_t));
|
|
||||||
|
|
||||||
// rocksdb_column_family_handle_t** ppCf = NULL;
|
|
||||||
|
|
||||||
// char* pChkpDir = NULL;
|
|
||||||
// char* pChkpIdDir = NULL;
|
|
||||||
// if (chkpPreCheckDir(pMeta->path, checkpointId, &pChkpDir, &pChkpIdDir) != 0) {
|
|
||||||
// taosArrayDestroy(refs);
|
|
||||||
// return code;
|
|
||||||
// }
|
|
||||||
|
|
||||||
// SBackendWrapper* pHandle = taosAcquireRef(streamBackendId, backendRid);
|
|
||||||
// if (pHandle == NULL || pHandle->db == NULL) {
|
|
||||||
// goto _ERROR;
|
|
||||||
// }
|
|
||||||
|
|
||||||
// // Get all cf and acquire cfWrappter
|
|
||||||
// int32_t nCf = chkpGetAllDbCfHandle(pMeta, &ppCf, refs);
|
|
||||||
// qDebug("stream backend:%p start to do checkpoint at:%s, cf num: %d ", pHandle, pChkpIdDir, nCf);
|
|
||||||
|
|
||||||
// code = chkpPreFlushDb(pHandle->db, ppCf, nCf);
|
|
||||||
// if (code == 0) {
|
|
||||||
// code = chkpDoDbCheckpoint(pHandle->db, pChkpIdDir);
|
|
||||||
// if (code != 0) {
|
|
||||||
// qError("stream backend:%p failed to do checkpoint at:%s", pHandle, pChkpIdDir);
|
|
||||||
// } else {
|
|
||||||
// qDebug("stream backend:%p end to do checkpoint at:%s, time cost:%" PRId64 "ms", pHandle, pChkpIdDir,
|
|
||||||
// taosGetTimestampMs() - st);
|
|
||||||
// }
|
|
||||||
// } else {
|
|
||||||
// qError("stream backend:%p failed to flush db at:%s", pHandle, pChkpIdDir);
|
|
||||||
// }
|
|
||||||
// // release all ref to cfWrapper;
|
|
||||||
// for (int i = 0; i < taosArrayGetSize(refs); i++) {
|
|
||||||
// int64_t id = *(int64_t*)taosArrayGet(refs, i);
|
|
||||||
// taosReleaseRef(streamBackendCfWrapperId, id);
|
|
||||||
// }
|
|
||||||
// if (code == 0) {
|
|
||||||
// taosWLockLatch(&pMeta->chkpDirLock);
|
|
||||||
// taosArrayPush(pMeta->chkpSaved, &checkpointId);
|
|
||||||
// taosWUnLockLatch(&pMeta->chkpDirLock);
|
|
||||||
|
|
||||||
// // delete obsolte checkpoint
|
|
||||||
// delObsoleteCheckpoint(arg, pChkpDir);
|
|
||||||
// pMeta->chkpId = checkpointId;
|
|
||||||
// }
|
|
||||||
|
|
||||||
// _ERROR:
|
|
||||||
// taosReleaseRef(streamBackendId, backendRid);
|
|
||||||
// taosArrayDestroy(refs);
|
|
||||||
// taosMemoryFree(ppCf);
|
|
||||||
// taosMemoryFree(pChkpDir);
|
|
||||||
// taosMemoryFree(pChkpIdDir);
|
|
||||||
// return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
SListNode* streamBackendAddCompare(void* backend, void* arg) {
|
SListNode* streamBackendAddCompare(void* backend, void* arg) {
|
||||||
SBackendWrapper* pHandle = (SBackendWrapper*)backend;
|
SBackendWrapper* pHandle = (SBackendWrapper*)backend;
|
||||||
|
|
|
@ -120,7 +120,7 @@ static int32_t appendCheckpointIntoInputQ(SStreamTask* pTask, int32_t checkpoint
|
||||||
pBlock->info.rows = 1;
|
pBlock->info.rows = 1;
|
||||||
pBlock->info.childId = pTask->info.selfChildId;
|
pBlock->info.childId = pTask->info.selfChildId;
|
||||||
|
|
||||||
pChkpoint->blocks = taosArrayInit(4, sizeof(SSDataBlock));//pBlock;
|
pChkpoint->blocks = taosArrayInit(4, sizeof(SSDataBlock)); // pBlock;
|
||||||
taosArrayPush(pChkpoint->blocks, pBlock);
|
taosArrayPush(pChkpoint->blocks, pBlock);
|
||||||
|
|
||||||
taosMemoryFree(pBlock);
|
taosMemoryFree(pBlock);
|
||||||
|
@ -166,10 +166,10 @@ static int32_t continueDispatchCheckpointBlock(SStreamDataBlock* pBlock, SStream
|
||||||
|
|
||||||
int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBlock) {
|
int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBlock) {
|
||||||
SSDataBlock* pDataBlock = taosArrayGet(pBlock->blocks, 0);
|
SSDataBlock* pDataBlock = taosArrayGet(pBlock->blocks, 0);
|
||||||
int64_t checkpointId = pDataBlock->info.version;
|
int64_t checkpointId = pDataBlock->info.version;
|
||||||
|
|
||||||
const char* id = pTask->id.idStr;
|
const char* id = pTask->id.idStr;
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
// set the task status
|
// set the task status
|
||||||
pTask->checkpointingId = checkpointId;
|
pTask->checkpointingId = checkpointId;
|
||||||
|
@ -177,7 +177,7 @@ int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBloc
|
||||||
// set task status
|
// set task status
|
||||||
pTask->status.taskStatus = TASK_STATUS__CK;
|
pTask->status.taskStatus = TASK_STATUS__CK;
|
||||||
|
|
||||||
{ // todo: remove this when the pipeline checkpoint generating is used.
|
{ // todo: remove this when the pipeline checkpoint generating is used.
|
||||||
SStreamMeta* pMeta = pTask->pMeta;
|
SStreamMeta* pMeta = pTask->pMeta;
|
||||||
taosWLockLatch(&pMeta->lock);
|
taosWLockLatch(&pMeta->lock);
|
||||||
|
|
||||||
|
@ -189,10 +189,11 @@ int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBloc
|
||||||
taosWUnLockLatch(&pMeta->lock);
|
taosWUnLockLatch(&pMeta->lock);
|
||||||
}
|
}
|
||||||
|
|
||||||
//todo fix race condition: set the status and append checkpoint block
|
// todo fix race condition: set the status and append checkpoint block
|
||||||
int32_t taskLevel = pTask->info.taskLevel;
|
int32_t taskLevel = pTask->info.taskLevel;
|
||||||
if (taskLevel == TASK_LEVEL__SOURCE) {
|
if (taskLevel == TASK_LEVEL__SOURCE) {
|
||||||
if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH ||
|
||||||
|
pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
||||||
qDebug("s-task:%s set childIdx:%d, and add checkpoint block into outputQ", id, pTask->info.selfChildId);
|
qDebug("s-task:%s set childIdx:%d, and add checkpoint block into outputQ", id, pTask->info.selfChildId);
|
||||||
continueDispatchCheckpointBlock(pBlock, pTask);
|
continueDispatchCheckpointBlock(pBlock, pTask);
|
||||||
} else { // only one task exists, no need to dispatch downstream info
|
} else { // only one task exists, no need to dispatch downstream info
|
||||||
|
@ -223,7 +224,8 @@ int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBloc
|
||||||
} else {
|
} else {
|
||||||
qDebug(
|
qDebug(
|
||||||
"s-task:%s process checkpoint block, all %d upstreams sent checkpoint msgs, dispatch checkpoint msg "
|
"s-task:%s process checkpoint block, all %d upstreams sent checkpoint msgs, dispatch checkpoint msg "
|
||||||
"downstream", id, num);
|
"downstream",
|
||||||
|
id, num);
|
||||||
|
|
||||||
// set the needed checked downstream tasks, only when all downstream tasks do checkpoint complete, this task
|
// set the needed checked downstream tasks, only when all downstream tasks do checkpoint complete, this task
|
||||||
// can start local checkpoint procedure
|
// can start local checkpoint procedure
|
||||||
|
@ -288,11 +290,11 @@ int32_t streamSaveAllTaskStatus(SStreamMeta* pMeta, int64_t checkpointId) {
|
||||||
|
|
||||||
// save the task
|
// save the task
|
||||||
streamMetaSaveTask(pMeta, p);
|
streamMetaSaveTask(pMeta, p);
|
||||||
streamTaskOpenAllUpstreamInput(p); // open inputQ for all upstream tasks
|
streamTaskOpenAllUpstreamInput(p); // open inputQ for all upstream tasks
|
||||||
qDebug("vgId:%d s-task:%s level:%d commit task status after checkpoint completed, checkpointId:%" PRId64
|
qDebug("vgId:%d s-task:%s level:%d commit task status after checkpoint completed, checkpointId:%" PRId64
|
||||||
", Ver(saved):%" PRId64 " currentVer:%" PRId64 ", status to be normal, prev:%s",
|
", Ver(saved):%" PRId64 " currentVer:%" PRId64 ", status to be normal, prev:%s",
|
||||||
pMeta->vgId, p->id.idStr, p->info.taskLevel, checkpointId, p->chkInfo.checkpointVer, p->chkInfo.nextProcessVer,
|
pMeta->vgId, p->id.idStr, p->info.taskLevel, checkpointId, p->chkInfo.checkpointVer,
|
||||||
streamGetTaskStatusStr(prev));
|
p->chkInfo.nextProcessVer, streamGetTaskStatusStr(prev));
|
||||||
}
|
}
|
||||||
|
|
||||||
if (streamMetaCommit(pMeta) < 0) {
|
if (streamMetaCommit(pMeta) < 0) {
|
||||||
|
@ -320,7 +322,7 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) {
|
||||||
qDebug("s-task:%s is ready for checkpoint", pTask->id.idStr);
|
qDebug("s-task:%s is ready for checkpoint", pTask->id.idStr);
|
||||||
pMeta->totalTasks = 0;
|
pMeta->totalTasks = 0;
|
||||||
|
|
||||||
streamBackendDoCheckpoint(pMeta, pTask->checkpointingId);
|
streamBackendDoCheckpoint(pTask->pBackend, pTask->checkpointingId);
|
||||||
streamSaveAllTaskStatus(pMeta, pTask->checkpointingId);
|
streamSaveAllTaskStatus(pMeta, pTask->checkpointingId);
|
||||||
qDebug("vgId:%d vnode wide checkpoint completed, save all tasks status, checkpointId:%" PRId64, pMeta->vgId,
|
qDebug("vgId:%d vnode wide checkpoint completed, save all tasks status, checkpointId:%" PRId64, pMeta->vgId,
|
||||||
pTask->checkpointingId);
|
pTask->checkpointingId);
|
||||||
|
|
Loading…
Reference in New Issue