refactor: do some internal refactor.

This commit is contained in:
Haojun Liao 2023-12-19 16:21:11 +08:00
parent ec3183b6a8
commit fee198f9ad
7 changed files with 11 additions and 8 deletions

View File

@ -31,7 +31,7 @@ int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, int64_t sversion, char*
int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen);
int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLeader);
int32_t startStreamTasks(SStreamMeta* pMeta);
int32_t resetStreamTaskStatus(SStreamMeta* pMeta);
int32_t tqStreamTaskResetStatus(SStreamMeta* pMeta);
int32_t tqStartTaskCompleteCallback(SStreamMeta* pMeta);
#endif // TDENGINE_TQ_COMMON_H

View File

@ -150,7 +150,7 @@ FAIL:
}
int32_t sndInit(SSnode * pSnode) {
resetStreamTaskStatus(pSnode->pMeta);
tqStreamTaskResetStatus(pSnode->pMeta);
startStreamTasks(pSnode->pMeta);
return 0;
}

View File

@ -218,7 +218,7 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM
if (vnodeIsRoleLeader(pTq->pVnode) && !tsDisableStream) {
tqInfo("vgId:%d start all stream tasks after all being updated", vgId);
resetStreamTaskStatus(pTq->pStreamMeta);
tqStreamTaskResetStatus(pTq->pStreamMeta);
tqStartStreamTaskAsync(pTq, false);
} else {
tqInfo("vgId:%d, follower node not start stream tasks", vgId);
@ -703,7 +703,7 @@ int32_t startStreamTasks(SStreamMeta* pMeta) {
return code;
}
int32_t resetStreamTaskStatus(SStreamMeta* pMeta) {
int32_t tqStreamTaskResetStatus(SStreamMeta* pMeta) {
int32_t vgId = pMeta->vgId;
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
@ -763,7 +763,7 @@ static int32_t restartStreamTasks(SStreamMeta* pMeta, bool isLeader) {
}
if (isLeader && !tsDisableStream) {
resetStreamTaskStatus(pMeta);
tqStreamTaskResetStatus(pMeta);
streamMetaWUnLock(pMeta);
tqInfo("vgId:%d restart all stream tasks after all tasks being updated", vgId);

View File

@ -570,7 +570,7 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx)
vInfo("vgId:%d, sync restore finished, not launch stream tasks, since stream tasks are disabled", vgId);
} else {
vInfo("vgId:%d sync restore finished, start to launch stream tasks", pVnode->config.vgId);
resetStreamTaskStatus(pVnode->pTq->pStreamMeta);
tqStreamTaskResetStatus(pVnode->pTq->pStreamMeta);
tqStreamTaskStartAsync(pMeta, &pVnode->msgCb, false);
}
} else {

View File

@ -1270,18 +1270,22 @@ void streamMetaRLock(SStreamMeta* pMeta) {
stTrace("vgId:%d meta-rlock", pMeta->vgId);
taosRLockLatch(&pMeta->lock);
}
void streamMetaRUnLock(SStreamMeta* pMeta) {
stTrace("vgId:%d meta-runlock", pMeta->vgId);
taosRUnLockLatch(&pMeta->lock);
}
void streamMetaWLock(SStreamMeta* pMeta) {
stTrace("vgId:%d meta-wlock", pMeta->vgId);
taosWLockLatch(&pMeta->lock);
}
void streamMetaWUnLock(SStreamMeta* pMeta) {
stTrace("vgId:%d meta-wunlock", pMeta->vgId);
taosWUnLockLatch(&pMeta->lock);
}
static void execHelper(struct SSchedMsg* pSchedMsg) {
__async_exec_fn_t execFn = (__async_exec_fn_t)pSchedMsg->ahandle;
int32_t code = execFn(pSchedMsg->thandle);

View File

@ -1083,7 +1083,6 @@ int32_t streamMetaUpdateTaskDownstreamStatus(SStreamMeta* pMeta, int64_t streamI
int64_t endTs, bool ready) {
STaskStartInfo* pStartInfo = &pMeta->startInfo;
STaskId id = {.streamId = streamId, .taskId = taskId};
bool restart = true;
streamMetaWLock(pMeta);
SHashObj* pDst = ready ? pStartInfo->pReadyTaskSet : pStartInfo->pFailedTaskSet;

View File

@ -72,7 +72,7 @@ TEST(osTest, osSystem) {
const int sysLen = 64;
char osSysName[sysLen];
int ret = taosGetOsReleaseName(osSysName, NULL, NULL, sysLen);
printf("os systeme name:%s\n", osSysName);
printf("os system name:%s\n", osSysName);
ASSERT_EQ(ret, 0);
}