refactor: do some internal refactor.
This commit is contained in:
parent
00f029e44f
commit
7cf90dde5c
|
@ -806,6 +806,10 @@ void streamMetaStartHb(SStreamMeta* pMeta);
|
||||||
void streamMetaInitForSnode(SStreamMeta* pMeta);
|
void streamMetaInitForSnode(SStreamMeta* pMeta);
|
||||||
bool streamMetaTaskInTimer(SStreamMeta* pMeta);
|
bool streamMetaTaskInTimer(SStreamMeta* pMeta);
|
||||||
int32_t streamMetaUpdateTaskReadyInfo(SStreamTask* pTask);
|
int32_t streamMetaUpdateTaskReadyInfo(SStreamTask* pTask);
|
||||||
|
void streamMetaRLock(SStreamMeta* pMeta);
|
||||||
|
void streamMetaRUnLock(SStreamMeta* pMeta);
|
||||||
|
void streamMetaWLock(SStreamMeta* pMeta);
|
||||||
|
void streamMetaWUnLock(SStreamMeta* pMeta);
|
||||||
|
|
||||||
// checkpoint
|
// checkpoint
|
||||||
int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSourceReq* pReq);
|
int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSourceReq* pReq);
|
||||||
|
|
|
@ -165,17 +165,17 @@ int32_t sndProcessTaskDeployReq(SSnode *pSnode, char *msg, int32_t msgLen) {
|
||||||
ASSERT(pTask->info.taskLevel == TASK_LEVEL__AGG);
|
ASSERT(pTask->info.taskLevel == TASK_LEVEL__AGG);
|
||||||
|
|
||||||
// 2.save task
|
// 2.save task
|
||||||
taosWLockLatch(&pSnode->pMeta->lock);
|
streamMetaWLock(pSnode->pMeta);
|
||||||
|
|
||||||
bool added = false;
|
bool added = false;
|
||||||
code = streamMetaRegisterTask(pSnode->pMeta, -1, pTask, &added);
|
code = streamMetaRegisterTask(pSnode->pMeta, -1, pTask, &added);
|
||||||
if (code < 0) {
|
if (code < 0) {
|
||||||
taosWUnLockLatch(&pSnode->pMeta->lock);
|
streamMetaWUnLock(pSnode->pMeta);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t numOfTasks = streamMetaGetNumOfTasks(pSnode->pMeta);
|
int32_t numOfTasks = streamMetaGetNumOfTasks(pSnode->pMeta);
|
||||||
taosWUnLockLatch(&pSnode->pMeta->lock);
|
streamMetaWUnLock(pSnode->pMeta);
|
||||||
|
|
||||||
char* p = NULL;
|
char* p = NULL;
|
||||||
streamTaskGetStatus(pTask, &p);
|
streamTaskGetStatus(pTask, &p);
|
||||||
|
@ -195,14 +195,14 @@ int32_t sndProcessTaskDropReq(SSnode *pSnode, char *msg, int32_t msgLen) {
|
||||||
streamMetaUnregisterTask(pSnode->pMeta, pReq->streamId, pReq->taskId);
|
streamMetaUnregisterTask(pSnode->pMeta, pReq->streamId, pReq->taskId);
|
||||||
|
|
||||||
// commit the update
|
// commit the update
|
||||||
taosWLockLatch(&pSnode->pMeta->lock);
|
streamMetaWLock(pSnode->pMeta);
|
||||||
int32_t numOfTasks = streamMetaGetNumOfTasks(pSnode->pMeta);
|
int32_t numOfTasks = streamMetaGetNumOfTasks(pSnode->pMeta);
|
||||||
qDebug("vgId:%d task:0x%x dropped, remain tasks:%d", pSnode->pMeta->vgId, pReq->taskId, numOfTasks);
|
qDebug("vgId:%d task:0x%x dropped, remain tasks:%d", pSnode->pMeta->vgId, pReq->taskId, numOfTasks);
|
||||||
|
|
||||||
if (streamMetaCommit(pSnode->pMeta) < 0) {
|
if (streamMetaCommit(pSnode->pMeta) < 0) {
|
||||||
// persist to disk
|
// persist to disk
|
||||||
}
|
}
|
||||||
taosWUnLockLatch(&pSnode->pMeta->lock);
|
streamMetaWUnLock(pSnode->pMeta);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1023,10 +1023,10 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms
|
||||||
int64_t streamId = pTask->id.streamId;
|
int64_t streamId = pTask->id.streamId;
|
||||||
bool added = false;
|
bool added = false;
|
||||||
|
|
||||||
taosWLockLatch(&pStreamMeta->lock);
|
streamMetaWLock(pStreamMeta);
|
||||||
code = streamMetaRegisterTask(pStreamMeta, sversion, pTask, &added);
|
code = streamMetaRegisterTask(pStreamMeta, sversion, pTask, &added);
|
||||||
int32_t numOfTasks = streamMetaGetNumOfTasks(pStreamMeta);
|
int32_t numOfTasks = streamMetaGetNumOfTasks(pStreamMeta);
|
||||||
taosWUnLockLatch(&pStreamMeta->lock);
|
streamMetaWUnLock(pStreamMeta);
|
||||||
|
|
||||||
if (code < 0) {
|
if (code < 0) {
|
||||||
tqError("failed to add s-task:0x%x into vgId:%d meta, total:%d, code:%s", vgId, taskId, numOfTasks, tstrerror(code));
|
tqError("failed to add s-task:0x%x into vgId:%d meta, total:%d, code:%s", vgId, taskId, numOfTasks, tstrerror(code));
|
||||||
|
@ -1406,14 +1406,14 @@ int32_t tqProcessTaskDropReq(STQ* pTq, char* msg, int32_t msgLen) {
|
||||||
streamMetaUnregisterTask(pMeta, pReq->streamId, pReq->taskId);
|
streamMetaUnregisterTask(pMeta, pReq->streamId, pReq->taskId);
|
||||||
|
|
||||||
// commit the update
|
// commit the update
|
||||||
taosWLockLatch(&pMeta->lock);
|
streamMetaWLock(pMeta);
|
||||||
int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
|
int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
|
||||||
tqDebug("vgId:%d task:0x%x dropped, remain tasks:%d", vgId, pReq->taskId, numOfTasks);
|
tqDebug("vgId:%d task:0x%x dropped, remain tasks:%d", vgId, pReq->taskId, numOfTasks);
|
||||||
|
|
||||||
if (streamMetaCommit(pMeta) < 0) {
|
if (streamMetaCommit(pMeta) < 0) {
|
||||||
// persist to disk
|
// persist to disk
|
||||||
}
|
}
|
||||||
taosWUnLockLatch(&pMeta->lock);
|
streamMetaWUnLock(pMeta);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -1724,7 +1724,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
|
||||||
taosThreadMutexUnlock(&pTask->lock);
|
taosThreadMutexUnlock(&pTask->lock);
|
||||||
|
|
||||||
int32_t total = 0;
|
int32_t total = 0;
|
||||||
taosWLockLatch(&pMeta->lock);
|
streamMetaWLock(pMeta);
|
||||||
|
|
||||||
// set the initial value for generating check point
|
// set the initial value for generating check point
|
||||||
// set the mgmt epset info according to the checkout source msg from mnode, todo update mgmt epset if needed
|
// set the mgmt epset info according to the checkout source msg from mnode, todo update mgmt epset if needed
|
||||||
|
@ -1733,7 +1733,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
|
||||||
}
|
}
|
||||||
|
|
||||||
total = pMeta->numOfStreamTasks;
|
total = pMeta->numOfStreamTasks;
|
||||||
taosWUnLockLatch(&pMeta->lock);
|
streamMetaWUnLock(pMeta);
|
||||||
|
|
||||||
qInfo("s-task:%s (vgId:%d) level:%d receive checkpoint-source msg chkpt:%" PRId64 ", total checkpoint reqs:%d",
|
qInfo("s-task:%s (vgId:%d) level:%d receive checkpoint-source msg chkpt:%" PRId64 ", total checkpoint reqs:%d",
|
||||||
pTask->id.idStr, vgId, pTask->info.taskLevel, req.checkpointId, total);
|
pTask->id.idStr, vgId, pTask->info.taskLevel, req.checkpointId, total);
|
||||||
|
@ -1804,8 +1804,7 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
tDecoderClear(&decoder);
|
tDecoderClear(&decoder);
|
||||||
|
|
||||||
// update the nodeEpset when it exists
|
// update the nodeEpset when it exists
|
||||||
taosWLockLatch(&pMeta->lock);
|
streamMetaWLock(pMeta);
|
||||||
tqDebug("vgId:%d meta-wlock", pMeta->vgId);
|
|
||||||
|
|
||||||
// the task epset may be updated again and again, when replaying the WAL, the task may be in stop status.
|
// the task epset may be updated again and again, when replaying the WAL, the task may be in stop status.
|
||||||
STaskId id = {.streamId = req.streamId, .taskId = req.taskId};
|
STaskId id = {.streamId = req.streamId, .taskId = req.taskId};
|
||||||
|
@ -1814,8 +1813,7 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
tqError("vgId:%d failed to acquire task:0x%x when handling update, it may have been dropped already", pMeta->vgId,
|
tqError("vgId:%d failed to acquire task:0x%x when handling update, it may have been dropped already", pMeta->vgId,
|
||||||
req.taskId);
|
req.taskId);
|
||||||
rsp.code = TSDB_CODE_SUCCESS;
|
rsp.code = TSDB_CODE_SUCCESS;
|
||||||
taosWUnLockLatch(&pMeta->lock);
|
streamMetaWUnLock(pMeta);
|
||||||
tqDebug("vgId:%d meta-unlock", pMeta->vgId);
|
|
||||||
|
|
||||||
taosArrayDestroy(req.pNodeList);
|
taosArrayDestroy(req.pNodeList);
|
||||||
return rsp.code;
|
return rsp.code;
|
||||||
|
@ -1838,22 +1836,19 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
tqDebug("s-task:%s (vgId:%d) already update in trans:%d, discard the nodeEp update msg", pTask->id.idStr, vgId,
|
tqDebug("s-task:%s (vgId:%d) already update in trans:%d, discard the nodeEp update msg", pTask->id.idStr, vgId,
|
||||||
req.transId);
|
req.transId);
|
||||||
rsp.code = TSDB_CODE_SUCCESS;
|
rsp.code = TSDB_CODE_SUCCESS;
|
||||||
taosWUnLockLatch(&pMeta->lock);
|
streamMetaWUnLock(pMeta);
|
||||||
tqDebug("vgId:%d meta-unlock", pMeta->vgId);
|
|
||||||
taosArrayDestroy(req.pNodeList);
|
taosArrayDestroy(req.pNodeList);
|
||||||
return rsp.code;
|
return rsp.code;
|
||||||
}
|
}
|
||||||
|
|
||||||
taosWUnLockLatch(&pMeta->lock);
|
streamMetaWUnLock(pMeta);
|
||||||
tqDebug("vgId:%d meta-unlock", pMeta->vgId);
|
|
||||||
|
|
||||||
// the following two functions should not be executed within the scope of meta lock to avoid deadlock
|
// the following two functions should not be executed within the scope of meta lock to avoid deadlock
|
||||||
streamTaskUpdateEpsetInfo(pTask, req.pNodeList);
|
streamTaskUpdateEpsetInfo(pTask, req.pNodeList);
|
||||||
streamTaskResetStatus(pTask);
|
streamTaskResetStatus(pTask);
|
||||||
|
|
||||||
// continue after lock the meta again
|
// continue after lock the meta again
|
||||||
taosWLockLatch(&pMeta->lock);
|
streamMetaWLock(pMeta);
|
||||||
tqDebug("vgId:%d meta-wlock", pMeta->vgId);
|
|
||||||
|
|
||||||
SStreamTask** ppHTask = NULL;
|
SStreamTask** ppHTask = NULL;
|
||||||
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
|
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
|
||||||
|
@ -1903,42 +1898,36 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
if (updateTasks < numOfTasks) {
|
if (updateTasks < numOfTasks) {
|
||||||
tqDebug("vgId:%d closed tasks:%d, unclosed:%d, all tasks will be started when nodeEp update completed", vgId,
|
tqDebug("vgId:%d closed tasks:%d, unclosed:%d, all tasks will be started when nodeEp update completed", vgId,
|
||||||
updateTasks, (numOfTasks - updateTasks));
|
updateTasks, (numOfTasks - updateTasks));
|
||||||
taosWUnLockLatch(&pMeta->lock);
|
streamMetaWUnLock(pMeta);
|
||||||
tqDebug("vgId:%d meta-unlock", pMeta->vgId);
|
|
||||||
} else {
|
} else {
|
||||||
if (!pTq->pVnode->restored) {
|
if (!pTq->pVnode->restored) {
|
||||||
tqDebug("vgId:%d vnode restore not completed, not restart the tasks, clear the start after nodeUpdate flag", vgId);
|
tqDebug("vgId:%d vnode restore not completed, not restart the tasks, clear the start after nodeUpdate flag", vgId);
|
||||||
pMeta->startInfo.startAllTasksFlag = 0;
|
pMeta->startInfo.startAllTasksFlag = 0;
|
||||||
taosWUnLockLatch(&pMeta->lock);
|
streamMetaWUnLock(pMeta);
|
||||||
tqDebug("vgId:%d meta-unlock", pMeta->vgId);
|
|
||||||
} else {
|
} else {
|
||||||
tqDebug("vgId:%d tasks are all updated and stopped, restart them", vgId);
|
tqDebug("vgId:%d tasks are all updated and stopped, restart them", vgId);
|
||||||
terrno = 0;
|
terrno = 0;
|
||||||
|
|
||||||
taosWUnLockLatch(&pMeta->lock);
|
streamMetaWUnLock(pMeta);
|
||||||
tqDebug("vgId:%d meta-unlock", pMeta->vgId);
|
|
||||||
|
|
||||||
while (streamMetaTaskInTimer(pMeta)) {
|
while (streamMetaTaskInTimer(pMeta)) {
|
||||||
qDebug("vgId:%d some tasks in timer, wait for 100ms and recheck", pMeta->vgId);
|
qDebug("vgId:%d some tasks in timer, wait for 100ms and recheck", pMeta->vgId);
|
||||||
taosMsleep(100);
|
taosMsleep(100);
|
||||||
}
|
}
|
||||||
|
|
||||||
taosWLockLatch(&pMeta->lock);
|
streamMetaWLock(pMeta);
|
||||||
tqDebug("vgId:%d meta-wlock", pMeta->vgId);
|
|
||||||
|
|
||||||
int32_t code = streamMetaReopen(pMeta);
|
int32_t code = streamMetaReopen(pMeta);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
tqError("vgId:%d failed to reopen stream meta", vgId);
|
tqError("vgId:%d failed to reopen stream meta", vgId);
|
||||||
taosWUnLockLatch(&pMeta->lock);
|
streamMetaWUnLock(pMeta);
|
||||||
tqDebug("vgId:%d meta-unlock", pMeta->vgId);
|
|
||||||
taosArrayDestroy(req.pNodeList);
|
taosArrayDestroy(req.pNodeList);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (streamMetaLoadAllTasks(pTq->pStreamMeta) < 0) {
|
if (streamMetaLoadAllTasks(pTq->pStreamMeta) < 0) {
|
||||||
tqError("vgId:%d failed to load stream tasks", vgId);
|
tqError("vgId:%d failed to load stream tasks", vgId);
|
||||||
taosWUnLockLatch(&pMeta->lock);
|
streamMetaWUnLock(pMeta);
|
||||||
tqDebug("vgId:%d meta-unlock", pMeta->vgId);
|
|
||||||
taosArrayDestroy(req.pNodeList);
|
taosArrayDestroy(req.pNodeList);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -1951,8 +1940,7 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
vInfo("vgId:%d, follower node not start stream tasks", vgId);
|
vInfo("vgId:%d, follower node not start stream tasks", vgId);
|
||||||
}
|
}
|
||||||
|
|
||||||
taosWUnLockLatch(&pMeta->lock);
|
streamMetaWUnLock(pMeta);
|
||||||
tqDebug("vgId:%d meta-unlock", pMeta->vgId);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -35,9 +35,9 @@ int32_t tqPushMsg(STQ* pTq, tmsg_t msgType) {
|
||||||
tqProcessSubmitReqForSubscribe(pTq);
|
tqProcessSubmitReqForSubscribe(pTq);
|
||||||
}
|
}
|
||||||
|
|
||||||
taosRLockLatch(&pTq->pStreamMeta->lock);
|
streamMetaRLock(pTq->pStreamMeta);
|
||||||
int32_t numOfTasks = streamMetaGetNumOfTasks(pTq->pStreamMeta);
|
int32_t numOfTasks = streamMetaGetNumOfTasks(pTq->pStreamMeta);
|
||||||
taosRUnLockLatch(&pTq->pStreamMeta->lock);
|
streamMetaRUnLock(pTq->pStreamMeta);
|
||||||
|
|
||||||
// tqTrace("vgId:%d handle submit, restore:%d, numOfTasks:%d", TD_VID(pTq->pVnode), pTq->pVnode->restored, numOfTasks);
|
// tqTrace("vgId:%d handle submit, restore:%d, numOfTasks:%d", TD_VID(pTq->pVnode), pTq->pVnode->restored, numOfTasks);
|
||||||
|
|
||||||
|
|
|
@ -1111,7 +1111,7 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) {
|
||||||
taosWUnLockLatch(&pTq->lock);
|
taosWUnLockLatch(&pTq->lock);
|
||||||
|
|
||||||
// update the table list handle for each stream scanner/wal reader
|
// update the table list handle for each stream scanner/wal reader
|
||||||
taosWLockLatch(&pTq->pStreamMeta->lock);
|
streamMetaWLock(pTq->pStreamMeta);
|
||||||
while (1) {
|
while (1) {
|
||||||
pIter = taosHashIterate(pTq->pStreamMeta->pTasksMap, pIter);
|
pIter = taosHashIterate(pTq->pStreamMeta->pTasksMap, pIter);
|
||||||
if (pIter == NULL) {
|
if (pIter == NULL) {
|
||||||
|
@ -1128,6 +1128,6 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
taosWUnLockLatch(&pTq->pStreamMeta->lock);
|
streamMetaWUnLock(pTq->pStreamMeta);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
|
@ -38,10 +38,10 @@ int32_t tqScanWal(STQ* pTq) {
|
||||||
doScanWalForAllTasks(pTq->pStreamMeta, &shouldIdle);
|
doScanWalForAllTasks(pTq->pStreamMeta, &shouldIdle);
|
||||||
|
|
||||||
if (shouldIdle) {
|
if (shouldIdle) {
|
||||||
taosWLockLatch(&pMeta->lock);
|
streamMetaWLock(pMeta);
|
||||||
int32_t times = (--pMeta->walScanCounter);
|
int32_t times = (--pMeta->walScanCounter);
|
||||||
ASSERT(pMeta->walScanCounter >= 0);
|
ASSERT(pMeta->walScanCounter >= 0);
|
||||||
taosWUnLockLatch(&pMeta->lock);
|
streamMetaWUnLock(pMeta);
|
||||||
|
|
||||||
if (times <= 0) {
|
if (times <= 0) {
|
||||||
break;
|
break;
|
||||||
|
@ -69,11 +69,11 @@ int32_t tqStartStreamTask(STQ* pTq) {
|
||||||
}
|
}
|
||||||
|
|
||||||
SArray* pTaskList = NULL;
|
SArray* pTaskList = NULL;
|
||||||
taosWLockLatch(&pMeta->lock);
|
streamMetaWLock(pMeta);
|
||||||
pTaskList = taosArrayDup(pMeta->pTaskList, NULL);
|
pTaskList = taosArrayDup(pMeta->pTaskList, NULL);
|
||||||
taosHashClear(pMeta->startInfo.pReadyTaskSet);
|
taosHashClear(pMeta->startInfo.pReadyTaskSet);
|
||||||
pMeta->startInfo.startTs = taosGetTimestampMs();
|
pMeta->startInfo.startTs = taosGetTimestampMs();
|
||||||
taosWUnLockLatch(&pMeta->lock);
|
streamMetaWUnLock(pMeta);
|
||||||
|
|
||||||
// broadcast the check downstream tasks msg
|
// broadcast the check downstream tasks msg
|
||||||
for (int32_t i = 0; i < numOfTasks; ++i) {
|
for (int32_t i = 0; i < numOfTasks; ++i) {
|
||||||
|
@ -146,12 +146,12 @@ int32_t tqScanWalAsync(STQ* pTq, bool ckPause) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
taosWLockLatch(&pMeta->lock);
|
streamMetaWLock(pMeta);
|
||||||
|
|
||||||
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
|
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
|
||||||
if (numOfTasks == 0) {
|
if (numOfTasks == 0) {
|
||||||
tqDebug("vgId:%d no stream tasks existed to run", vgId);
|
tqDebug("vgId:%d no stream tasks existed to run", vgId);
|
||||||
taosWUnLockLatch(&pMeta->lock);
|
streamMetaWUnLock(pMeta);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -162,7 +162,7 @@ int32_t tqScanWalAsync(STQ* pTq, bool ckPause) {
|
||||||
|
|
||||||
if (pMeta->walScanCounter > 1) {
|
if (pMeta->walScanCounter > 1) {
|
||||||
tqDebug("vgId:%d wal read task has been launched, remain scan times:%d", vgId, pMeta->walScanCounter);
|
tqDebug("vgId:%d wal read task has been launched, remain scan times:%d", vgId, pMeta->walScanCounter);
|
||||||
taosWUnLockLatch(&pMeta->lock);
|
streamMetaWUnLock(pMeta);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -172,7 +172,7 @@ int32_t tqScanWalAsync(STQ* pTq, bool ckPause) {
|
||||||
|
|
||||||
// reset the counter value, since we do not launch the scan wal operation.
|
// reset the counter value, since we do not launch the scan wal operation.
|
||||||
pMeta->walScanCounter = 0;
|
pMeta->walScanCounter = 0;
|
||||||
taosWUnLockLatch(&pMeta->lock);
|
streamMetaWUnLock(pMeta);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -180,7 +180,7 @@ int32_t tqScanWalAsync(STQ* pTq, bool ckPause) {
|
||||||
if (pRunReq == NULL) {
|
if (pRunReq == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
tqError("vgId:%d failed to create msg to start wal scanning to launch stream tasks, code:%s", vgId, terrstr());
|
tqError("vgId:%d failed to create msg to start wal scanning to launch stream tasks, code:%s", vgId, terrstr());
|
||||||
taosWUnLockLatch(&pMeta->lock);
|
streamMetaWUnLock(pMeta);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -191,7 +191,7 @@ int32_t tqScanWalAsync(STQ* pTq, bool ckPause) {
|
||||||
|
|
||||||
SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)};
|
SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)};
|
||||||
tmsgPutToQueue(&pTq->pVnode->msgCb, STREAM_QUEUE, &msg);
|
tmsgPutToQueue(&pTq->pVnode->msgCb, STREAM_QUEUE, &msg);
|
||||||
taosWUnLockLatch(&pMeta->lock);
|
streamMetaWUnLock(pMeta);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -207,9 +207,9 @@ int32_t tqStopStreamTasks(STQ* pTq) {
|
||||||
}
|
}
|
||||||
|
|
||||||
SArray* pTaskList = NULL;
|
SArray* pTaskList = NULL;
|
||||||
taosWLockLatch(&pMeta->lock);
|
streamMetaWLock(pMeta);
|
||||||
pTaskList = taosArrayDup(pMeta->pTaskList, NULL);
|
pTaskList = taosArrayDup(pMeta->pTaskList, NULL);
|
||||||
taosWUnLockLatch(&pMeta->lock);
|
streamMetaWUnLock(pMeta);
|
||||||
|
|
||||||
for (int32_t i = 0; i < numOfTasks; ++i) {
|
for (int32_t i = 0; i < numOfTasks; ++i) {
|
||||||
SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i);
|
SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i);
|
||||||
|
@ -410,9 +410,9 @@ int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle) {
|
||||||
|
|
||||||
// clone the task list, to avoid the task update during scan wal files
|
// clone the task list, to avoid the task update during scan wal files
|
||||||
SArray* pTaskList = NULL;
|
SArray* pTaskList = NULL;
|
||||||
taosWLockLatch(&pStreamMeta->lock);
|
streamMetaWLock(pStreamMeta);
|
||||||
pTaskList = taosArrayDup(pStreamMeta->pTaskList, NULL);
|
pTaskList = taosArrayDup(pStreamMeta->pTaskList, NULL);
|
||||||
taosWUnLockLatch(&pStreamMeta->lock);
|
streamMetaWUnLock(pStreamMeta);
|
||||||
|
|
||||||
tqDebug("vgId:%d start to check wal to extract new submit block for %d tasks", vgId, numOfTasks);
|
tqDebug("vgId:%d start to check wal to extract new submit block for %d tasks", vgId, numOfTasks);
|
||||||
|
|
||||||
|
|
|
@ -555,13 +555,11 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx)
|
||||||
pVnode->restored = true;
|
pVnode->restored = true;
|
||||||
|
|
||||||
SStreamMeta* pMeta = pVnode->pTq->pStreamMeta;
|
SStreamMeta* pMeta = pVnode->pTq->pStreamMeta;
|
||||||
taosWLockLatch(&pMeta->lock);
|
streamMetaWLock(pMeta);
|
||||||
tqDebug("vgId:%d meta-wlock", pMeta->vgId);
|
|
||||||
|
|
||||||
if (pMeta->startInfo.startAllTasksFlag) {
|
if (pMeta->startInfo.startAllTasksFlag) {
|
||||||
vInfo("vgId:%d, sync restore finished, stream tasks will be launched by other thread", vgId);
|
vInfo("vgId:%d, sync restore finished, stream tasks will be launched by other thread", vgId);
|
||||||
taosWUnLockLatch(&pMeta->lock);
|
streamMetaWUnLock(pMeta);
|
||||||
tqDebug("vgId:%d meta-unlock", pMeta->vgId);
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -578,8 +576,7 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx)
|
||||||
vInfo("vgId:%d, sync restore finished, not launch stream tasks since not leader", vgId);
|
vInfo("vgId:%d, sync restore finished, not launch stream tasks since not leader", vgId);
|
||||||
}
|
}
|
||||||
|
|
||||||
taosWUnLockLatch(&pMeta->lock);
|
streamMetaWUnLock(pMeta);
|
||||||
tqDebug("vgId:%d meta-unlock", pMeta->vgId);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static void vnodeBecomeFollower(const SSyncFSM *pFsm) {
|
static void vnodeBecomeFollower(const SSyncFSM *pFsm) {
|
||||||
|
|
|
@ -184,14 +184,13 @@ int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBloc
|
||||||
|
|
||||||
{ // todo: remove this when the pipeline checkpoint generating is used.
|
{ // todo: remove this when the pipeline checkpoint generating is used.
|
||||||
SStreamMeta* pMeta = pTask->pMeta;
|
SStreamMeta* pMeta = pTask->pMeta;
|
||||||
taosWLockLatch(&pMeta->lock);
|
streamMetaWLock(pMeta);
|
||||||
stDebug("vgId:%d meta-wlock", pMeta->vgId);
|
|
||||||
|
|
||||||
if (pMeta->chkptNotReadyTasks == 0) {
|
if (pMeta->chkptNotReadyTasks == 0) {
|
||||||
pMeta->chkptNotReadyTasks = pMeta->numOfStreamTasks;
|
pMeta->chkptNotReadyTasks = pMeta->numOfStreamTasks;
|
||||||
}
|
}
|
||||||
|
|
||||||
taosWUnLockLatch(&pMeta->lock);
|
streamMetaWUnLock(pMeta);
|
||||||
}
|
}
|
||||||
|
|
||||||
//todo fix race condition: set the status and append checkpoint block
|
//todo fix race condition: set the status and append checkpoint block
|
||||||
|
@ -284,8 +283,7 @@ int32_t streamSaveAllTaskStatus(SStreamMeta* pMeta, int64_t checkpointId) {
|
||||||
int32_t vgId = pMeta->vgId;
|
int32_t vgId = pMeta->vgId;
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
taosWLockLatch(&pMeta->lock);
|
streamMetaWLock(pMeta);
|
||||||
stDebug("vgId:%d meta-wlock", pMeta->vgId);
|
|
||||||
|
|
||||||
for (int32_t i = 0; i < taosArrayGetSize(pMeta->pTaskList); ++i) {
|
for (int32_t i = 0; i < taosArrayGetSize(pMeta->pTaskList); ++i) {
|
||||||
STaskId* pId = taosArrayGet(pMeta->pTaskList, i);
|
STaskId* pId = taosArrayGet(pMeta->pTaskList, i);
|
||||||
|
@ -310,8 +308,7 @@ int32_t streamSaveAllTaskStatus(SStreamMeta* pMeta, int64_t checkpointId) {
|
||||||
code = streamTaskHandleEvent(p->status.pSM, TASK_EVENT_CHECKPOINT_DONE);
|
code = streamTaskHandleEvent(p->status.pSM, TASK_EVENT_CHECKPOINT_DONE);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
stDebug("s-task:%s vgId:%d save task status failed, since handle event failed", p->id.idStr, vgId);
|
stDebug("s-task:%s vgId:%d save task status failed, since handle event failed", p->id.idStr, vgId);
|
||||||
taosWUnLockLatch(&pMeta->lock);
|
streamMetaWUnLock(pMeta);
|
||||||
stDebug("vgId:%d meta-unlock", pMeta->vgId);
|
|
||||||
return -1;
|
return -1;
|
||||||
} else { // save the task
|
} else { // save the task
|
||||||
streamMetaSaveTask(pMeta, p);
|
streamMetaSaveTask(pMeta, p);
|
||||||
|
@ -332,8 +329,7 @@ int32_t streamSaveAllTaskStatus(SStreamMeta* pMeta, int64_t checkpointId) {
|
||||||
stInfo("vgId:%d commit stream meta after do checkpoint, checkpointId:%" PRId64 " DONE", pMeta->vgId, checkpointId);
|
stInfo("vgId:%d commit stream meta after do checkpoint, checkpointId:%" PRId64 " DONE", pMeta->vgId, checkpointId);
|
||||||
}
|
}
|
||||||
|
|
||||||
taosWUnLockLatch(&pMeta->lock);
|
streamMetaWUnLock(pMeta);
|
||||||
stDebug("vgId:%d meta-unlock", pMeta->vgId);
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -297,11 +297,11 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
|
||||||
streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &pTask->id);
|
streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &pTask->id);
|
||||||
|
|
||||||
// 2. save to disk
|
// 2. save to disk
|
||||||
taosWLockLatch(&pMeta->lock);
|
streamMetaWLock(pMeta);
|
||||||
if (streamMetaCommit(pMeta) < 0) {
|
if (streamMetaCommit(pMeta) < 0) {
|
||||||
// persist to disk
|
// persist to disk
|
||||||
}
|
}
|
||||||
taosWUnLockLatch(&pMeta->lock);
|
streamMetaWUnLock(pMeta);
|
||||||
return TSDB_CODE_STREAM_TASK_NOT_EXIST;
|
return TSDB_CODE_STREAM_TASK_NOT_EXIST;
|
||||||
} else {
|
} else {
|
||||||
stDebug("s-task:%s fill-history task end, update related stream task:%s info, transfer exec state", pTask->id.idStr,
|
stDebug("s-task:%s fill-history task end, update related stream task:%s info, transfer exec state", pTask->id.idStr,
|
||||||
|
@ -357,9 +357,9 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
|
||||||
streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &pTask->id);
|
streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &pTask->id);
|
||||||
|
|
||||||
// 5. save to disk
|
// 5. save to disk
|
||||||
taosWLockLatch(&pMeta->lock);
|
streamMetaWLock(pMeta);
|
||||||
pStreamTask->status.taskStatus = streamTaskGetStatus(pStreamTask, NULL);
|
pStreamTask->status.taskStatus = streamTaskGetStatus(pStreamTask, NULL);
|
||||||
taosWUnLockLatch(&pMeta->lock);
|
streamMetaWUnLock(pMeta);
|
||||||
|
|
||||||
// 7. pause allowed.
|
// 7. pause allowed.
|
||||||
streamTaskEnablePause(pStreamTask);
|
streamTaskEnablePause(pStreamTask);
|
||||||
|
|
|
@ -447,20 +447,20 @@ int32_t streamMetaGetNumOfTasks(SStreamMeta* pMeta) {
|
||||||
}
|
}
|
||||||
|
|
||||||
SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) {
|
SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) {
|
||||||
taosRLockLatch(&pMeta->lock);
|
streamMetaRLock(pMeta);
|
||||||
|
|
||||||
STaskId id = {.streamId = streamId, .taskId = taskId};
|
STaskId id = {.streamId = streamId, .taskId = taskId};
|
||||||
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
|
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
|
||||||
if (ppTask != NULL) {
|
if (ppTask != NULL) {
|
||||||
if (!streamTaskShouldStop(*ppTask)) {
|
if (!streamTaskShouldStop(*ppTask)) {
|
||||||
int32_t ref = atomic_add_fetch_32(&(*ppTask)->refCnt, 1);
|
int32_t ref = atomic_add_fetch_32(&(*ppTask)->refCnt, 1);
|
||||||
taosRUnLockLatch(&pMeta->lock);
|
streamMetaRUnLock(pMeta);
|
||||||
stTrace("s-task:%s acquire task, ref:%d", (*ppTask)->id.idStr, ref);
|
stTrace("s-task:%s acquire task, ref:%d", (*ppTask)->id.idStr, ref);
|
||||||
return *ppTask;
|
return *ppTask;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
taosRUnLockLatch(&pMeta->lock);
|
streamMetaRUnLock(pMeta);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -491,8 +491,7 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t
|
||||||
SStreamTask* pTask = NULL;
|
SStreamTask* pTask = NULL;
|
||||||
|
|
||||||
// pre-delete operation
|
// pre-delete operation
|
||||||
taosWLockLatch(&pMeta->lock);
|
streamMetaWLock(pMeta);
|
||||||
stDebug("vgId:%d meta-wlock", pMeta->vgId);
|
|
||||||
|
|
||||||
STaskId id = {.streamId = streamId, .taskId = taskId};
|
STaskId id = {.streamId = streamId, .taskId = taskId};
|
||||||
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
|
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
|
||||||
|
@ -509,40 +508,34 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t
|
||||||
streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_DROPPING);
|
streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_DROPPING);
|
||||||
} else {
|
} else {
|
||||||
stDebug("vgId:%d failed to find the task:0x%x, it may be dropped already", pMeta->vgId, taskId);
|
stDebug("vgId:%d failed to find the task:0x%x, it may be dropped already", pMeta->vgId, taskId);
|
||||||
taosWUnLockLatch(&pMeta->lock);
|
streamMetaWUnLock(pMeta);
|
||||||
stDebug("vgId:%d meta-unlock", pMeta->vgId);
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
taosWUnLockLatch(&pMeta->lock);
|
streamMetaWUnLock(pMeta);
|
||||||
stDebug("vgId:%d meta-unlock", pMeta->vgId);
|
|
||||||
|
|
||||||
stDebug("s-task:0x%x set task status:dropping and start to unregister it", taskId);
|
stDebug("s-task:0x%x set task status:dropping and start to unregister it", taskId);
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
taosRLockLatch(&pMeta->lock);
|
streamMetaRLock(pMeta);
|
||||||
|
|
||||||
ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
|
ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
|
||||||
if (ppTask) {
|
if (ppTask) {
|
||||||
if ((*ppTask)->status.timerActive == 0) {
|
if ((*ppTask)->status.timerActive == 0) {
|
||||||
taosRUnLockLatch(&pMeta->lock);
|
streamMetaRUnLock(pMeta);
|
||||||
stDebug("vgId:%d meta-unlock", pMeta->vgId);
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
taosMsleep(10);
|
taosMsleep(10);
|
||||||
stDebug("s-task:%s wait for quit from timer", (*ppTask)->id.idStr);
|
stDebug("s-task:%s wait for quit from timer", (*ppTask)->id.idStr);
|
||||||
taosRUnLockLatch(&pMeta->lock);
|
streamMetaRUnLock(pMeta);
|
||||||
stDebug("vgId:%d meta-unlock", pMeta->vgId);
|
|
||||||
} else {
|
} else {
|
||||||
taosRUnLockLatch(&pMeta->lock);
|
streamMetaRUnLock(pMeta);
|
||||||
stDebug("vgId:%d meta-unlock", pMeta->vgId);
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// let's do delete of stream task
|
// let's do delete of stream task
|
||||||
taosWLockLatch(&pMeta->lock);
|
streamMetaWLock(pMeta);
|
||||||
stDebug("vgId:%d meta-wlock", pMeta->vgId);
|
|
||||||
|
|
||||||
ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
|
ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
|
||||||
if (ppTask) {
|
if (ppTask) {
|
||||||
|
@ -573,16 +566,15 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t
|
||||||
stDebug("vgId:%d failed to find the task:0x%x, it may have been dropped already", pMeta->vgId, taskId);
|
stDebug("vgId:%d failed to find the task:0x%x, it may have been dropped already", pMeta->vgId, taskId);
|
||||||
}
|
}
|
||||||
|
|
||||||
taosWUnLockLatch(&pMeta->lock);
|
streamMetaWUnLock(pMeta);
|
||||||
stDebug("vgId:%d meta-unlock", pMeta->vgId);
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamMetaBegin(SStreamMeta* pMeta) {
|
int32_t streamMetaBegin(SStreamMeta* pMeta) {
|
||||||
taosWLockLatch(&pMeta->lock);
|
streamMetaWLock(pMeta);
|
||||||
int32_t code = tdbBegin(pMeta->db, &pMeta->txn, tdbDefaultMalloc, tdbDefaultFree, NULL,
|
int32_t code = tdbBegin(pMeta->db, &pMeta->txn, tdbDefaultMalloc, tdbDefaultFree, NULL,
|
||||||
TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
|
TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
|
||||||
taosWUnLockLatch(&pMeta->lock);
|
streamMetaWUnLock(pMeta);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -890,7 +882,7 @@ void metaHbToMnode(void* param, void* tmrId) {
|
||||||
stDebug("vgId:%d build stream task hb, leader:%d", pMeta->vgId, (pMeta->role == NODE_ROLE_LEADER));
|
stDebug("vgId:%d build stream task hb, leader:%d", pMeta->vgId, (pMeta->role == NODE_ROLE_LEADER));
|
||||||
|
|
||||||
SStreamHbMsg hbMsg = {0};
|
SStreamHbMsg hbMsg = {0};
|
||||||
taosRLockLatch(&pMeta->lock);
|
streamMetaRLock(pMeta);
|
||||||
int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
|
int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
|
||||||
|
|
||||||
SEpSet epset = {0};
|
SEpSet epset = {0};
|
||||||
|
@ -963,7 +955,7 @@ void metaHbToMnode(void* param, void* tmrId) {
|
||||||
}
|
}
|
||||||
|
|
||||||
hbMsg.numOfTasks = taosArrayGetSize(hbMsg.pTaskStatus);
|
hbMsg.numOfTasks = taosArrayGetSize(hbMsg.pTaskStatus);
|
||||||
taosRUnLockLatch(&pMeta->lock);
|
streamMetaRUnLock(pMeta);
|
||||||
|
|
||||||
if (hasMnodeEpset) {
|
if (hasMnodeEpset) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
@ -1018,8 +1010,7 @@ void metaHbToMnode(void* param, void* tmrId) {
|
||||||
|
|
||||||
bool streamMetaTaskInTimer(SStreamMeta* pMeta) {
|
bool streamMetaTaskInTimer(SStreamMeta* pMeta) {
|
||||||
bool inTimer = false;
|
bool inTimer = false;
|
||||||
taosWLockLatch(&pMeta->lock);
|
streamMetaWLock(pMeta);
|
||||||
stDebug("vgId:%d meta-wlock", pMeta->vgId);
|
|
||||||
|
|
||||||
void* pIter = NULL;
|
void* pIter = NULL;
|
||||||
while (1) {
|
while (1) {
|
||||||
|
@ -1034,9 +1025,7 @@ bool streamMetaTaskInTimer(SStreamMeta* pMeta) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
taosWUnLockLatch(&pMeta->lock);
|
streamMetaWUnLock(pMeta);
|
||||||
stDebug("vgId:%d meta-unlock", pMeta->vgId);
|
|
||||||
|
|
||||||
return inTimer;
|
return inTimer;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1046,8 +1035,7 @@ void streamMetaNotifyClose(SStreamMeta* pMeta) {
|
||||||
stDebug("vgId:%d notify all stream tasks that the vnode is closing. isLeader:%d startHb%" PRId64 ", totalHb:%d", vgId,
|
stDebug("vgId:%d notify all stream tasks that the vnode is closing. isLeader:%d startHb%" PRId64 ", totalHb:%d", vgId,
|
||||||
(pMeta->role == NODE_ROLE_LEADER), pMeta->pHbInfo->hbStart, pMeta->pHbInfo->hbCount);
|
(pMeta->role == NODE_ROLE_LEADER), pMeta->pHbInfo->hbStart, pMeta->pHbInfo->hbCount);
|
||||||
|
|
||||||
taosWLockLatch(&pMeta->lock);
|
streamMetaWLock(pMeta);
|
||||||
stDebug("vgId:%d meta-wlock", pMeta->vgId);
|
|
||||||
|
|
||||||
void* pIter = NULL;
|
void* pIter = NULL;
|
||||||
while (1) {
|
while (1) {
|
||||||
|
@ -1061,8 +1049,7 @@ void streamMetaNotifyClose(SStreamMeta* pMeta) {
|
||||||
streamTaskStop(pTask);
|
streamTaskStop(pTask);
|
||||||
}
|
}
|
||||||
|
|
||||||
taosWUnLockLatch(&pMeta->lock);
|
streamMetaWUnLock(pMeta);
|
||||||
stDebug("vgId:%d meta-unlock", pMeta->vgId);
|
|
||||||
|
|
||||||
// wait for the stream meta hb function stopping
|
// wait for the stream meta hb function stopping
|
||||||
if (pMeta->role == NODE_ROLE_LEADER) {
|
if (pMeta->role == NODE_ROLE_LEADER) {
|
||||||
|
@ -1101,4 +1088,22 @@ void streamMetaResetStartInfo(STaskStartInfo* pStartInfo) {
|
||||||
taosHashClear(pStartInfo->pReadyTaskSet);
|
taosHashClear(pStartInfo->pReadyTaskSet);
|
||||||
pStartInfo->startAllTasksFlag = 0;
|
pStartInfo->startAllTasksFlag = 0;
|
||||||
pStartInfo->readyTs = 0;
|
pStartInfo->readyTs = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void streamMetaRLock(SStreamMeta* pMeta) {
|
||||||
|
stDebug("vgId:%d meta-rlock", pMeta->vgId);
|
||||||
|
taosRLockLatch(&pMeta->lock);
|
||||||
|
}
|
||||||
|
void streamMetaRUnLock(SStreamMeta* pMeta) {
|
||||||
|
stDebug("vgId:%d meta-runlock", pMeta->vgId);
|
||||||
|
taosRUnLockLatch(&pMeta->lock);
|
||||||
|
}
|
||||||
|
void streamMetaWLock(SStreamMeta* pMeta) {
|
||||||
|
stDebug("vgId:%d meta-wlock", pMeta->vgId);
|
||||||
|
taosWLockLatch(&pMeta->lock);
|
||||||
|
}
|
||||||
|
void streamMetaWUnLock(SStreamMeta* pMeta) {
|
||||||
|
stDebug("vgId:%d meta-wunlock", pMeta->vgId);
|
||||||
|
taosWUnLockLatch(&pMeta->lock);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
|
@ -583,10 +583,10 @@ int32_t streamProcessScanHistoryFinishRsp(SStreamTask* pTask) {
|
||||||
int32_t code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_SCANHIST_DONE);
|
int32_t code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_SCANHIST_DONE);
|
||||||
streamTaskSetSchedStatusInactive(pTask);
|
streamTaskSetSchedStatusInactive(pTask);
|
||||||
|
|
||||||
taosWLockLatch(&pMeta->lock);
|
streamMetaWLock(pMeta);
|
||||||
streamMetaSaveTask(pMeta, pTask);
|
streamMetaSaveTask(pMeta, pTask);
|
||||||
streamMetaCommit(pMeta);
|
streamMetaCommit(pMeta);
|
||||||
taosWUnLockLatch(&pMeta->lock);
|
streamMetaWUnLock(pMeta);
|
||||||
|
|
||||||
// history data scan in the stream time window finished, now let's enable the pause
|
// history data scan in the stream time window finished, now let's enable the pause
|
||||||
streamTaskEnablePause(pTask);
|
streamTaskEnablePause(pTask);
|
||||||
|
@ -624,8 +624,7 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) {
|
||||||
SLaunchHTaskInfo* pInfo = param;
|
SLaunchHTaskInfo* pInfo = param;
|
||||||
SStreamMeta* pMeta = pInfo->pMeta;
|
SStreamMeta* pMeta = pInfo->pMeta;
|
||||||
|
|
||||||
taosWLockLatch(&pMeta->lock);
|
streamMetaWLock(pMeta);
|
||||||
stDebug("vgId:%d meta-wlock", pMeta->vgId);
|
|
||||||
|
|
||||||
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &pInfo->id, sizeof(pInfo->id));
|
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &pInfo->id, sizeof(pInfo->id));
|
||||||
if (ppTask) {
|
if (ppTask) {
|
||||||
|
@ -639,13 +638,11 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) {
|
||||||
(*ppTask)->id.idStr, p, (*ppTask)->hTaskInfo.retryTimes, ref);
|
(*ppTask)->id.idStr, p, (*ppTask)->hTaskInfo.retryTimes, ref);
|
||||||
|
|
||||||
taosMemoryFree(pInfo);
|
taosMemoryFree(pInfo);
|
||||||
taosWUnLockLatch(&pMeta->lock);
|
streamMetaWUnLock(pMeta);
|
||||||
stDebug("vgId:%d meta-unlock", pMeta->vgId);
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
taosWUnLockLatch(&pMeta->lock);
|
streamMetaWUnLock(pMeta);
|
||||||
stDebug("vgId:%d meta-unlock", pMeta->vgId);
|
|
||||||
|
|
||||||
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pInfo->id.streamId, pInfo->id.taskId);
|
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pInfo->id.streamId, pInfo->id.taskId);
|
||||||
if (pTask != NULL) {
|
if (pTask != NULL) {
|
||||||
|
@ -981,8 +978,7 @@ void streamTaskEnablePause(SStreamTask* pTask) {
|
||||||
int32_t streamMetaUpdateTaskReadyInfo(SStreamTask* pTask) {
|
int32_t streamMetaUpdateTaskReadyInfo(SStreamTask* pTask) {
|
||||||
SStreamMeta* pMeta = pTask->pMeta;
|
SStreamMeta* pMeta = pTask->pMeta;
|
||||||
|
|
||||||
taosWLockLatch(&pMeta->lock);
|
streamMetaWLock(pMeta);
|
||||||
stDebug("vgId:%d meta-wlock", pMeta->vgId);
|
|
||||||
|
|
||||||
STaskId id = streamTaskExtractKey(pTask);
|
STaskId id = streamTaskExtractKey(pTask);
|
||||||
taosHashPut(pMeta->startInfo.pReadyTaskSet, &id, sizeof(id), NULL, 0);
|
taosHashPut(pMeta->startInfo.pReadyTaskSet, &id, sizeof(id), NULL, 0);
|
||||||
|
@ -1003,7 +999,6 @@ int32_t streamMetaUpdateTaskReadyInfo(SStreamTask* pTask) {
|
||||||
pStartInfo->elapsedTime / 1000.0);
|
pStartInfo->elapsedTime / 1000.0);
|
||||||
}
|
}
|
||||||
|
|
||||||
taosWUnLockLatch(&pMeta->lock);
|
streamMetaWUnLock(pMeta);
|
||||||
stDebug("vgId:%d meta-unlock", pMeta->vgId);
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
|
@ -121,7 +121,7 @@ SStreamState* streamStateOpen(char* path, void* pTask, bool specPath, int32_t sz
|
||||||
#ifdef USE_ROCKSDB
|
#ifdef USE_ROCKSDB
|
||||||
SStreamMeta* pMeta = pStreamTask->pMeta;
|
SStreamMeta* pMeta = pStreamTask->pMeta;
|
||||||
pState->streamBackendRid = pMeta->streamBackendRid;
|
pState->streamBackendRid = pMeta->streamBackendRid;
|
||||||
// taosWLockLatch(&pMeta->lock);
|
// streamMetaWLock(pMeta);
|
||||||
taosThreadMutexLock(&pMeta->backendMutex);
|
taosThreadMutexLock(&pMeta->backendMutex);
|
||||||
void* uniqueId =
|
void* uniqueId =
|
||||||
taosHashGet(pMeta->pTaskBackendUnique, pState->pTdbState->idstr, strlen(pState->pTdbState->idstr) + 1);
|
taosHashGet(pMeta->pTaskBackendUnique, pState->pTdbState->idstr, strlen(pState->pTdbState->idstr) + 1);
|
||||||
|
|
Loading…
Reference in New Issue