refactor: do some internal refactor.

This commit is contained in:
Haojun Liao 2024-09-15 00:45:45 +08:00
parent 5f486d6976
commit c4a49a7bd9
5 changed files with 148 additions and 137 deletions

View File

@ -209,29 +209,18 @@ int32_t continueDispatchCheckpointTriggerBlock(SStreamDataBlock* pBlock, SStream
return code; return code;
} }
int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock* pBlock) { static int32_t doCheckBeforeHandleChkptTrigger(SStreamTask* pTask, int64_t checkpointId, SStreamDataBlock* pBlock,
SSDataBlock* pDataBlock = taosArrayGet(pBlock->blocks, 0); int32_t transId) {
if (pDataBlock == NULL) { int32_t code = 0;
return TSDB_CODE_INVALID_PARA; int32_t vgId = pTask->pMeta->vgId;
} int32_t taskLevel = pTask->info.taskLevel;
const char* id = pTask->id.idStr;
int64_t checkpointId = pDataBlock->info.version;
int32_t transId = pDataBlock->info.window.skey;
const char* id = pTask->id.idStr;
int32_t code = TSDB_CODE_SUCCESS;
int32_t vgId = pTask->pMeta->vgId;
int32_t taskLevel = pTask->info.taskLevel;
SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo; SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo;
streamMutexLock(&pTask->lock);
if (pTask->chkInfo.checkpointId > checkpointId) { if (pTask->chkInfo.checkpointId > checkpointId) {
stError("s-task:%s vgId:%d current checkpointId:%" PRId64 stError("s-task:%s vgId:%d current checkpointId:%" PRId64
" recv expired checkpoint-trigger block, checkpointId:%" PRId64 " transId:%d, discard", " recv expired checkpoint-trigger block, checkpointId:%" PRId64 " transId:%d, discard",
id, vgId, pTask->chkInfo.checkpointId, checkpointId, transId); id, vgId, pTask->chkInfo.checkpointId, checkpointId, transId);
streamMutexUnlock(&pTask->lock);
streamFreeQitem((SStreamQueueItem*)pBlock);
return code; return code;
} }
@ -239,37 +228,33 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock
stError("s-task:%s vgId:%d checkpointId:%" PRId64 " transId:%d, has been marked failed, failedId:%" PRId64 stError("s-task:%s vgId:%d checkpointId:%" PRId64 " transId:%d, has been marked failed, failedId:%" PRId64
" discard the checkpoint-trigger block", " discard the checkpoint-trigger block",
id, vgId, checkpointId, transId, pActiveInfo->failedId); id, vgId, checkpointId, transId, pActiveInfo->failedId);
streamMutexUnlock(&pTask->lock);
streamFreeQitem((SStreamQueueItem*)pBlock);
return code; return code;
} }
if (pTask->chkInfo.checkpointId == checkpointId) { if (pTask->chkInfo.checkpointId == checkpointId) {
{ // send checkpoint-ready msg to upstream { // send checkpoint-ready msg to upstream
SRpcMsg msg = {0}; SRpcMsg msg = {0};
SStreamUpstreamEpInfo* pInfo = NULL; SStreamUpstreamEpInfo* pInfo = NULL;
streamTaskGetUpstreamTaskEpInfo(pTask, pBlock->srcTaskId, &pInfo); streamTaskGetUpstreamTaskEpInfo(pTask, pBlock->srcTaskId, &pInfo);
if (pInfo == NULL) { if (pInfo == NULL) {
streamMutexUnlock(&pTask->lock);
return TSDB_CODE_STREAM_TASK_NOT_EXIST; return TSDB_CODE_STREAM_TASK_NOT_EXIST;
} }
code = initCheckpointReadyMsg(pTask, pInfo->nodeId, pBlock->srcTaskId, pInfo->childId, checkpointId, &msg); code = initCheckpointReadyMsg(pTask, pInfo->nodeId, pBlock->srcTaskId, pInfo->childId, checkpointId, &msg);
if (code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
code = tmsgSendReq(&pInfo->epSet, &msg); code = tmsgSendReq(&pInfo->epSet, &msg);
if (code) {
stError("s-task:%s vgId:%d failed send chkpt-ready msg to upstream, code:%s", id, vgId, tstrerror(code));
}
} }
} }
stWarn( stWarn(
"s-task:%s vgId:%d recv already finished checkpoint msg, send checkpoint-ready to upstream:0x%x to resume the " "s-task:%s vgId:%d recv already finished checkpoint-trigger, send checkpoint-ready to upstream:0x%x to resume "
"interrupted checkpoint", "the interrupted checkpoint",
id, vgId, pBlock->srcTaskId); id, vgId, pBlock->srcTaskId);
streamTaskOpenUpstreamInput(pTask, pBlock->srcTaskId); streamTaskOpenUpstreamInput(pTask, pBlock->srcTaskId);
streamMutexUnlock(&pTask->lock);
streamFreeQitem((SStreamQueueItem*)pBlock);
return code; return code;
} }
@ -278,9 +263,6 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock
stError("s-task:%s vgId:%d active checkpointId:%" PRId64 ", recv invalid checkpoint-trigger checkpointId:%" PRId64 stError("s-task:%s vgId:%d active checkpointId:%" PRId64 ", recv invalid checkpoint-trigger checkpointId:%" PRId64
" discard", " discard",
id, vgId, pActiveInfo->activeId, checkpointId); id, vgId, pActiveInfo->activeId, checkpointId);
streamMutexUnlock(&pTask->lock);
streamFreeQitem((SStreamQueueItem*)pBlock);
return code; return code;
} else { // checkpointId == pActiveInfo->activeId } else { // checkpointId == pActiveInfo->activeId
if (pActiveInfo->allUpstreamTriggerRecv == 1) { if (pActiveInfo->allUpstreamTriggerRecv == 1) {
@ -288,8 +270,6 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock
"s-task:%s vgId:%d all upstream checkpoint-trigger recv, discard this checkpoint-trigger, " "s-task:%s vgId:%d all upstream checkpoint-trigger recv, discard this checkpoint-trigger, "
"checkpointId:%" PRId64 " transId:%d", "checkpointId:%" PRId64 " transId:%d",
id, vgId, checkpointId, transId); id, vgId, checkpointId, transId);
streamMutexUnlock(&pTask->lock);
streamFreeQitem((SStreamQueueItem*)pBlock);
return code; return code;
} }
@ -298,7 +278,6 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock
for (int32_t i = 0; i < taosArrayGetSize(pActiveInfo->pReadyMsgList); ++i) { for (int32_t i = 0; i < taosArrayGetSize(pActiveInfo->pReadyMsgList); ++i) {
STaskCheckpointReadyInfo* p = taosArrayGet(pActiveInfo->pReadyMsgList, i); STaskCheckpointReadyInfo* p = taosArrayGet(pActiveInfo->pReadyMsgList, i);
if (p == NULL) { if (p == NULL) {
streamMutexUnlock(&pTask->lock);
return TSDB_CODE_INVALID_PARA; return TSDB_CODE_INVALID_PARA;
} }
@ -306,9 +285,6 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock
stWarn("s-task:%s repeatly recv checkpoint-source msg from task:0x%x vgId:%d, checkpointId:%" PRId64 stWarn("s-task:%s repeatly recv checkpoint-source msg from task:0x%x vgId:%d, checkpointId:%" PRId64
", prev recvTs:%" PRId64 " discard", ", prev recvTs:%" PRId64 " discard",
pTask->id.idStr, p->upstreamTaskId, p->upstreamNodeId, p->checkpointId, p->recvTs); pTask->id.idStr, p->upstreamTaskId, p->upstreamNodeId, p->checkpointId, p->recvTs);
streamMutexUnlock(&pTask->lock);
streamFreeQitem((SStreamQueueItem*)pBlock);
return code; return code;
} }
} }
@ -316,7 +292,33 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock
} }
} }
return 0;
}
int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock* pBlock) {
int64_t checkpointId = 0;
int32_t transId = 0;
const char* id = pTask->id.idStr;
int32_t code = TSDB_CODE_SUCCESS;
int32_t vgId = pTask->pMeta->vgId;
int32_t taskLevel = pTask->info.taskLevel;
SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo;
SSDataBlock* pDataBlock = taosArrayGet(pBlock->blocks, 0);
if (pDataBlock == NULL) {
return TSDB_CODE_INVALID_PARA;
}
checkpointId = pDataBlock->info.version;
transId = pDataBlock->info.window.skey;
streamMutexLock(&pTask->lock);
code = doCheckBeforeHandleChkptTrigger(pTask, checkpointId, pBlock, transId);
streamMutexUnlock(&pTask->lock); streamMutexUnlock(&pTask->lock);
if (code) {
streamFreeQitem((SStreamQueueItem*)pBlock);
return code;
}
stDebug("s-task:%s vgId:%d start to handle the checkpoint-trigger block, checkpointId:%" PRId64 " ver:%" PRId64 stDebug("s-task:%s vgId:%d start to handle the checkpoint-trigger block, checkpointId:%" PRId64 " ver:%" PRId64
", transId:%d current active checkpointId:%" PRId64, ", transId:%d current active checkpointId:%" PRId64,
@ -367,6 +369,7 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock
// before the next checkpoint. // before the next checkpoint.
code = flushStateDataInExecutor(pTask, (SStreamQueueItem*)pBlock); code = flushStateDataInExecutor(pTask, (SStreamQueueItem*)pBlock);
if (code) { if (code) {
streamFreeQitem((SStreamQueueItem*)pBlock);
return code; return code;
} }
@ -675,10 +678,7 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SV
} }
streamMetaWLock(pMeta); streamMetaWLock(pMeta);
code = streamMetaCommit(pMeta);
if (streamMetaCommit(pMeta) < 0) {
// persist to disk
}
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }

View File

@ -458,9 +458,6 @@ int32_t streamMetaOpen(const char* path, void* ahandle, FTaskBuild buildTaskFn,
code = createMetaHbInfo(pRid, &pMeta->pHbInfo); code = createMetaHbInfo(pRid, &pMeta->pHbInfo);
TSDB_CHECK_CODE(code, lino, _err); TSDB_CHECK_CODE(code, lino, _err);
pMeta->qHandle = taosInitScheduler(32, 1, "stream-chkp", NULL);
TSDB_CHECK_NULL(pMeta->qHandle, code, lino, _err, terrno);
code = bkdMgtCreate(tpath, (SBkdMgt**)&pMeta->bkdChkptMgt); code = bkdMgtCreate(tpath, (SBkdMgt**)&pMeta->bkdChkptMgt);
TSDB_CHECK_CODE(code, lino, _err); TSDB_CHECK_CODE(code, lino, _err);
@ -629,9 +626,6 @@ void streamMetaCloseImpl(void* arg) {
taosMemoryFree(pMeta->path); taosMemoryFree(pMeta->path);
streamMutexDestroy(&pMeta->backendMutex); streamMutexDestroy(&pMeta->backendMutex);
taosCleanUpScheduler(pMeta->qHandle);
taosMemoryFree(pMeta->qHandle);
bkdMgtDestroy(pMeta->bkdChkptMgt); bkdMgtDestroy(pMeta->bkdChkptMgt);
pMeta->role = NODE_ROLE_UNINIT; pMeta->role = NODE_ROLE_UNINIT;
@ -1261,40 +1255,6 @@ void streamMetaStartHb(SStreamMeta* pMeta) {
streamMetaHbToMnode(pRid, NULL); streamMetaHbToMnode(pRid, NULL);
} }
void streamMetaRLock(SStreamMeta* pMeta) {
// stTrace("vgId:%d meta-rlock", pMeta->vgId);
int32_t code = taosThreadRwlockRdlock(&pMeta->lock);
if (code) {
stError("vgId:%d meta-rlock failed, code:%s", pMeta->vgId, tstrerror(code));
}
}
void streamMetaRUnLock(SStreamMeta* pMeta) {
// stTrace("vgId:%d meta-runlock", pMeta->vgId);
int32_t code = taosThreadRwlockUnlock(&pMeta->lock);
if (code != TSDB_CODE_SUCCESS) {
stError("vgId:%d meta-runlock failed, code:%s", pMeta->vgId, tstrerror(code));
} else {
// stTrace("vgId:%d meta-runlock completed", pMeta->vgId);
}
}
void streamMetaWLock(SStreamMeta* pMeta) {
// stTrace("vgId:%d meta-wlock", pMeta->vgId);
int32_t code = taosThreadRwlockWrlock(&pMeta->lock);
if (code) {
stError("vgId:%d failed to apply wlock, code:%s", pMeta->vgId, tstrerror(code));
}
}
void streamMetaWUnLock(SStreamMeta* pMeta) {
// stTrace("vgId:%d meta-wunlock", pMeta->vgId);
int32_t code = taosThreadRwlockUnlock(&pMeta->lock);
if (code) {
stError("vgId:%d failed to apply wunlock, code:%s", pMeta->vgId, tstrerror(code));
}
}
int32_t streamMetaSendMsgBeforeCloseTasks(SStreamMeta* pMeta, SArray** pList) { int32_t streamMetaSendMsgBeforeCloseTasks(SStreamMeta* pMeta, SArray** pList) {
QRY_PARAM_CHECK(pList); QRY_PARAM_CHECK(pList);
@ -1398,60 +1358,6 @@ int32_t streamMetaResetTaskStatus(SStreamMeta* pMeta) {
return 0; return 0;
} }
int32_t streamMetaAddFailedTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) {
int32_t code = TSDB_CODE_SUCCESS;
int64_t now = taosGetTimestampMs();
int64_t startTs = 0;
bool hasFillhistoryTask = false;
STaskId hId = {0};
stDebug("vgId:%d add start failed task:0x%x", pMeta->vgId, taskId);
streamMetaRLock(pMeta);
STaskId id = {.streamId = streamId, .taskId = taskId};
SStreamTask** ppTask = taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
if (ppTask != NULL) {
startTs = (*ppTask)->taskCheckInfo.startTs;
hasFillhistoryTask = HAS_RELATED_FILLHISTORY_TASK(*ppTask);
hId = (*ppTask)->hTaskInfo.id;
streamMetaRUnLock(pMeta);
// add the failed task info, along with the related fill-history task info into tasks list.
code = streamMetaAddTaskLaunchResult(pMeta, streamId, taskId, startTs, now, false);
if (hasFillhistoryTask) {
code = streamMetaAddTaskLaunchResult(pMeta, hId.streamId, hId.taskId, startTs, now, false);
}
} else {
streamMetaRUnLock(pMeta);
stError("failed to locate the stream task:0x%" PRIx64 "-0x%x (vgId:%d), it may have been destroyed or stopped",
streamId, taskId, pMeta->vgId);
code = TSDB_CODE_STREAM_TASK_NOT_EXIST;
}
return code;
}
void streamMetaAddFailedTaskSelf(SStreamTask* pTask, int64_t failedTs) {
int32_t startTs = pTask->execInfo.checkTs;
int32_t code = streamMetaAddTaskLaunchResult(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, startTs, failedTs, false);
if (code) {
stError("s-task:%s failed to add self task failed to start, code:%s", pTask->id.idStr, tstrerror(code));
}
// automatically set the related fill-history task to be failed.
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
STaskId* pId = &pTask->hTaskInfo.id;
code = streamMetaAddTaskLaunchResult(pTask->pMeta, pId->streamId, pId->taskId, startTs, failedTs, false);
if (code) {
stError("s-task:0x%" PRIx64 " failed to add self task failed to start, code:%s", pId->taskId, tstrerror(code));
}
}
}
void streamMetaAddIntoUpdateTaskList(SStreamMeta* pMeta, SStreamTask* pTask, SStreamTask* pHTask, int32_t transId, void streamMetaAddIntoUpdateTaskList(SStreamMeta* pMeta, SStreamTask* pTask, SStreamTask* pHTask, int32_t transId,
int64_t startTs) { int64_t startTs) {
const char* id = pTask->id.idStr; const char* id = pTask->id.idStr;

View File

@ -30,8 +30,8 @@ typedef struct STaskInitTs {
} STaskInitTs; } STaskInitTs;
static int32_t prepareBeforeStartTasks(SStreamMeta* pMeta, SArray** pList, int64_t now); static int32_t prepareBeforeStartTasks(SStreamMeta* pMeta, SArray** pList, int64_t now);
static bool allCheckDownstreamRsp(SStreamMeta* pMeta, STaskStartInfo* pStartInfo, int32_t numOfTotal); static bool allCheckDownstreamRsp(SStreamMeta* pMeta, STaskStartInfo* pStartInfo, int32_t numOfTotal);
static void displayStatusInfo(SStreamMeta* pMeta, SHashObj* pTaskSet, bool succ); static void displayStatusInfo(SStreamMeta* pMeta, SHashObj* pTaskSet, bool succ);
// restore the checkpoint id by negotiating the latest consensus checkpoint id // restore the checkpoint id by negotiating the latest consensus checkpoint id
int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) { int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) {
@ -505,3 +505,57 @@ void streamTaskSetReqConsenChkptId(SStreamTask* pTask, int64_t ts) {
stDebug("s-task:%s set req consen-checkpointId flag, prev transId:%d, ts:%" PRId64, pTask->id.idStr, prevTrans, ts); stDebug("s-task:%s set req consen-checkpointId flag, prev transId:%d, ts:%" PRId64, pTask->id.idStr, prevTrans, ts);
} }
int32_t streamMetaAddFailedTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) {
int32_t code = TSDB_CODE_SUCCESS;
int64_t now = taosGetTimestampMs();
int64_t startTs = 0;
bool hasFillhistoryTask = false;
STaskId hId = {0};
stDebug("vgId:%d add start failed task:0x%x", pMeta->vgId, taskId);
streamMetaRLock(pMeta);
STaskId id = {.streamId = streamId, .taskId = taskId};
SStreamTask** ppTask = taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
if (ppTask != NULL) {
startTs = (*ppTask)->taskCheckInfo.startTs;
hasFillhistoryTask = HAS_RELATED_FILLHISTORY_TASK(*ppTask);
hId = (*ppTask)->hTaskInfo.id;
streamMetaRUnLock(pMeta);
// add the failed task info, along with the related fill-history task info into tasks list.
code = streamMetaAddTaskLaunchResult(pMeta, streamId, taskId, startTs, now, false);
if (hasFillhistoryTask) {
code = streamMetaAddTaskLaunchResult(pMeta, hId.streamId, hId.taskId, startTs, now, false);
}
} else {
streamMetaRUnLock(pMeta);
stError("failed to locate the stream task:0x%" PRIx64 "-0x%x (vgId:%d), it may have been destroyed or stopped",
streamId, taskId, pMeta->vgId);
code = TSDB_CODE_STREAM_TASK_NOT_EXIST;
}
return code;
}
void streamMetaAddFailedTaskSelf(SStreamTask* pTask, int64_t failedTs) {
int32_t startTs = pTask->execInfo.checkTs;
int32_t code = streamMetaAddTaskLaunchResult(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, startTs, failedTs, false);
if (code) {
stError("s-task:%s failed to add self task failed to start, code:%s", pTask->id.idStr, tstrerror(code));
}
// automatically set the related fill-history task to be failed.
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
STaskId* pId = &pTask->hTaskInfo.id;
code = streamMetaAddTaskLaunchResult(pTask->pMeta, pId->streamId, pId->taskId, startTs, failedTs, false);
if (code) {
stError("s-task:0x%" PRIx64 " failed to add self task failed to start, code:%s", pId->taskId, tstrerror(code));
}
}
}

View File

@ -801,8 +801,8 @@ bool streamTaskSetSchedStatusWait(SStreamTask* pTask) {
pTask->status.schedStatus = TASK_SCHED_STATUS__WAITING; pTask->status.schedStatus = TASK_SCHED_STATUS__WAITING;
ret = true; ret = true;
} }
streamMutexUnlock(&pTask->lock);
streamMutexUnlock(&pTask->lock);
return ret; return ret;
} }

View File

@ -35,3 +35,54 @@ void streamMutexDestroy(TdThreadMutex *pMutex) {
stError("%p mutex destroy, code:%s", pMutex, tstrerror(code)); stError("%p mutex destroy, code:%s", pMutex, tstrerror(code));
} }
} }
void streamMetaRLock(SStreamMeta* pMeta) {
// stTrace("vgId:%d meta-rlock", pMeta->vgId);
int32_t code = taosThreadRwlockRdlock(&pMeta->lock);
if (code) {
stError("vgId:%d meta-rlock failed, code:%s", pMeta->vgId, tstrerror(code));
}
}
void streamMetaRUnLock(SStreamMeta* pMeta) {
// stTrace("vgId:%d meta-runlock", pMeta->vgId);
int32_t code = taosThreadRwlockUnlock(&pMeta->lock);
if (code != TSDB_CODE_SUCCESS) {
stError("vgId:%d meta-runlock failed, code:%s", pMeta->vgId, tstrerror(code));
} else {
// stTrace("vgId:%d meta-runlock completed", pMeta->vgId);
}
}
void streamMetaWLock(SStreamMeta* pMeta) {
// stTrace("vgId:%d meta-wlock", pMeta->vgId);
int32_t code = taosThreadRwlockWrlock(&pMeta->lock);
if (code) {
stError("vgId:%d failed to apply wlock, code:%s", pMeta->vgId, tstrerror(code));
}
}
void streamMetaWUnLock(SStreamMeta* pMeta) {
// stTrace("vgId:%d meta-wunlock", pMeta->vgId);
int32_t code = taosThreadRwlockUnlock(&pMeta->lock);
if (code) {
stError("vgId:%d failed to apply wunlock, code:%s", pMeta->vgId, tstrerror(code));
}
}
void streamSetFatalError(SStreamMeta* pMeta, int32_t code, const char* funcName, int32_t lino) {
int32_t oldCode = atomic_val_compare_exchange_32(&pMeta->fatalInfo.code, 0, code);
if (oldCode == 0) {
pMeta->fatalInfo.ts = taosGetTimestampMs();
pMeta->fatalInfo.threadId = taosGetSelfPthreadId();
tstrncpy(pMeta->fatalInfo.func, funcName, tListLen(pMeta->fatalInfo.func));
pMeta->fatalInfo.line = lino;
stInfo("vgId:%d set global fatal error, code:%s %s line:%d", pMeta->vgId, tstrerror(code), funcName, lino);
} else {
stFatal("vgId:%d existed global fatal eror:%s, failed to set new fatal error code:%s", pMeta->vgId, code);
}
}
int32_t streamGetFatalError(const SStreamMeta* pMeta) {
return atomic_load_32((volatile int32_t*) &pMeta->fatalInfo.code);
}