fix(stream): add async call restart, instead of sync wait.

This commit is contained in:
Haojun Liao 2023-12-18 16:33:44 +08:00
parent f3e0feb998
commit ea6e78cbaa
7 changed files with 52 additions and 19 deletions

View File

@ -32,5 +32,6 @@ int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen
int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLeader);
int32_t startStreamTasks(SStreamMeta* pMeta);
int32_t resetStreamTaskStatus(SStreamMeta* pMeta);
int32_t tqStartTaskCompleteCallback(SStreamMeta* pMeta);
#endif // TDENGINE_TQ_COMMON_H

View File

@ -460,14 +460,18 @@ struct SStreamTask {
char reserve[256];
};
typedef int32_t (*startComplete_fn_t)(struct SStreamMeta*);
typedef struct STaskStartInfo {
int64_t startTs;
int64_t readyTs;
int32_t tasksWillRestart;
int32_t taskStarting; // restart flag, sentinel to guard the restart procedure.
SHashObj* pReadyTaskSet; // tasks that are all ready for running stream processing
SHashObj* pFailedTaskSet; // tasks that are done the check downstream process, may be successful or failed
int32_t taskStarting; // restart flag, sentinel to guard the restart procedure.
SHashObj* pReadyTaskSet; // tasks that are all ready for running stream processing
SHashObj* pFailedTaskSet; // tasks that are done the check downstream process, may be successful or failed
int64_t elapsedTime;
int32_t restartCount; // restart task counter
startComplete_fn_t completeFn; // complete callback function
} STaskStartInfo;
typedef struct STaskUpdateInfo {
@ -827,7 +831,7 @@ int32_t streamProcessScanHistoryFinishRsp(SStreamTask* pTask);
// stream task meta
void streamMetaInit();
void streamMetaCleanup();
SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId, int64_t stage);
SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId, int64_t stage, startComplete_fn_t fn);
void streamMetaClose(SStreamMeta* streamMeta);
int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask); // save to stream meta store
int32_t streamMetaRemoveTask(SStreamMeta* pMeta, STaskId* pKey);

View File

@ -127,7 +127,7 @@ SSnode *sndOpen(const char *path, const SSnodeOpt *pOption) {
}
pSnode->msgCb = pOption->msgCb;
pSnode->pMeta = streamMetaOpen(path, pSnode, (FTaskExpand *)sndExpandTask, SNODE_HANDLE, taosGetTimestampMs());
pSnode->pMeta = streamMetaOpen(path, pSnode, (FTaskExpand *)sndExpandTask, SNODE_HANDLE, taosGetTimestampMs(), tqStartTaskCompleteCallback);
if (pSnode->pMeta == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto FAIL;

View File

@ -96,7 +96,8 @@ int32_t tqInitialize(STQ* pTq) {
return -1;
}
pTq->pStreamMeta = streamMetaOpen(pTq->path, pTq, (FTaskExpand*)tqExpandTask, pTq->pVnode->config.vgId, -1);
int32_t vgId = TD_VID(pTq->pVnode);
pTq->pStreamMeta = streamMetaOpen(pTq->path, pTq, (FTaskExpand*)tqExpandTask, vgId, -1, tqStartTaskCompleteCallback);
if (pTq->pStreamMeta == NULL) {
return -1;
}

View File

@ -699,8 +699,8 @@ int32_t startStreamTasks(SStreamMeta* pMeta) {
}
int32_t resetStreamTaskStatus(SStreamMeta* pMeta) {
int32_t vgId = pMeta->vgId;
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
int32_t vgId = pMeta->vgId;
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
tqDebug("vgId:%d reset all %d stream task(s) status to be uninit", vgId, numOfTasks);
if (numOfTasks == 0) {
@ -723,16 +723,17 @@ static int32_t restartStreamTasks(SStreamMeta* pMeta, bool isLeader) {
int32_t code = 0;
int64_t st = taosGetTimestampMs();
while(1) {
int32_t startVal = atomic_val_compare_exchange_32(&pMeta->startInfo.taskStarting, 0, 1);
if (startVal == 0) {
break;
}
tqDebug("vgId:%d in start stream tasks procedure, wait for 500ms and recheck", vgId);
taosMsleep(500);
streamMetaWLock(pMeta);
if (pMeta->startInfo.taskStarting == 1) {
pMeta->startInfo.restartCount += 1;
tqDebug("vgId:%d in start tasks procedure, inc restartCounter by 1, %d", vgId, pMeta->startInfo.restartCount);
streamMetaWUnLock(pMeta);
return TSDB_CODE_SUCCESS;
}
pMeta->startInfo.taskStarting = 1;
streamMetaWUnLock(pMeta);
terrno = 0;
tqInfo("vgId:%d tasks are all updated and stopped, restart all tasks, triggered by transId:%d", vgId,
pMeta->updateInfo.transId);
@ -791,7 +792,7 @@ int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLead
char* p = NULL;
if (streamTaskReadyToRun(pTask, &p)) {
tqDebug("vgId:%d s-task:%s start to process block from inputQ, next checked ver:%" PRId64, vgId, pTask->id.idStr,
pTask->chkInfo.nextProcessVer);
pTask->chkInfo.nextProcessVer);
streamExecTask(pTask);
} else {
int8_t status = streamTaskSetSchedStatusInactive(pTask);
@ -808,4 +809,23 @@ int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLead
}
}
int32_t tqStartTaskCompleteCallback(SStreamMeta* pMeta) {
STaskStartInfo* pStartInfo = &pMeta->startInfo;
taosWLockLatch(&pMeta->lock);
if (pStartInfo->restartCount > 0) {
pStartInfo->restartCount -= 1;
ASSERT(pStartInfo->taskStarting == 0);
tqDebug("vgId:%d role:%d need to restart all tasks again, restartCounter:%d", pMeta->vgId, pMeta->role,
pStartInfo->restartCount);
taosWUnLockLatch(&pMeta->lock);
restartStreamTasks(pMeta, (pMeta->role == NODE_ROLE_LEADER));
} else {
taosWUnLockLatch(&pMeta->lock);
tqDebug("vgId:%d start all tasks completed", pMeta->vgId);
}
return TSDB_CODE_SUCCESS;
}

View File

@ -277,7 +277,9 @@ int32_t streamTaskSetDb(SStreamMeta* pMeta, void* arg, char* key) {
stDebug("s-task:0x%x set backend %p", pTask->id.taskId, pBackend);
return 0;
}
SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId, int64_t stage) {
SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId, int64_t stage,
startComplete_fn_t fn) {
int32_t code = -1;
SStreamMeta* pMeta = taosMemoryCalloc(1, sizeof(SStreamMeta));
if (pMeta == NULL) {
@ -343,6 +345,7 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
pMeta->stage = stage;
pMeta->role = (vgId == SNODE_HANDLE) ? NODE_ROLE_LEADER : NODE_ROLE_UNINIT;
pMeta->startInfo.completeFn = fn;
pMeta->pTaskDbUnique = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
// pMeta->chkpId = streamGetLatestCheckpointId(pMeta);
@ -1258,8 +1261,9 @@ void streamMetaResetStartInfo(STaskStartInfo* pStartInfo) {
taosHashClear(pStartInfo->pFailedTaskSet);
pStartInfo->tasksWillRestart = 0;
pStartInfo->readyTs = 0;
// reset the sentinel flag value to be 0
atomic_store_32(&pStartInfo->taskStarting, 0);
pStartInfo->taskStarting = 0;
}
void streamMetaRLock(SStreamMeta* pMeta) {

View File

@ -1083,6 +1083,7 @@ int32_t streamMetaUpdateTaskDownstreamStatus(SStreamMeta* pMeta, int64_t streamI
int64_t endTs, bool ready) {
STaskStartInfo* pStartInfo = &pMeta->startInfo;
STaskId id = {.streamId = streamId, .taskId = taskId};
bool restart = true;
streamMetaWLock(pMeta);
SHashObj* pDst = ready ? pStartInfo->pReadyTaskSet : pStartInfo->pFailedTaskSet;
@ -1106,6 +1107,8 @@ int32_t streamMetaUpdateTaskDownstreamStatus(SStreamMeta* pMeta, int64_t streamI
displayStatusInfo(pMeta, pStartInfo->pReadyTaskSet, true);
displayStatusInfo(pMeta, pStartInfo->pFailedTaskSet, false);
streamMetaResetStartInfo(pStartInfo);
pStartInfo->completeFn(pMeta);
} else {
stDebug("vgId:%d recv check down results:%d, total:%d", pMeta->vgId, numOfRecv, numOfTotal);
}