refactor: add some logs.
This commit is contained in:
parent
385e699cc2
commit
00f029e44f
|
@ -762,7 +762,6 @@ int32_t streamQueueGetNumOfItems(const SStreamQueue* pQueue);
|
||||||
int32_t streamRestoreParam(SStreamTask* pTask);
|
int32_t streamRestoreParam(SStreamTask* pTask);
|
||||||
void streamTaskPause(SStreamTask* pTask, SStreamMeta* pMeta);
|
void streamTaskPause(SStreamTask* pTask, SStreamMeta* pMeta);
|
||||||
void streamTaskResume(SStreamTask* pTask);
|
void streamTaskResume(SStreamTask* pTask);
|
||||||
void streamTaskDisablePause(SStreamTask* pTask);
|
|
||||||
void streamTaskEnablePause(SStreamTask* pTask);
|
void streamTaskEnablePause(SStreamTask* pTask);
|
||||||
int32_t streamTaskSetUpstreamInfo(SStreamTask* pTask, const SStreamTask* pUpstreamTask);
|
int32_t streamTaskSetUpstreamInfo(SStreamTask* pTask, const SStreamTask* pUpstreamTask);
|
||||||
void streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet);
|
void streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet);
|
||||||
|
|
|
@ -1805,6 +1805,7 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
|
|
||||||
// update the nodeEpset when it exists
|
// update the nodeEpset when it exists
|
||||||
taosWLockLatch(&pMeta->lock);
|
taosWLockLatch(&pMeta->lock);
|
||||||
|
tqDebug("vgId:%d meta-wlock", pMeta->vgId);
|
||||||
|
|
||||||
// the task epset may be updated again and again, when replaying the WAL, the task may be in stop status.
|
// the task epset may be updated again and again, when replaying the WAL, the task may be in stop status.
|
||||||
STaskId id = {.streamId = req.streamId, .taskId = req.taskId};
|
STaskId id = {.streamId = req.streamId, .taskId = req.taskId};
|
||||||
|
@ -1814,6 +1815,8 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
req.taskId);
|
req.taskId);
|
||||||
rsp.code = TSDB_CODE_SUCCESS;
|
rsp.code = TSDB_CODE_SUCCESS;
|
||||||
taosWUnLockLatch(&pMeta->lock);
|
taosWUnLockLatch(&pMeta->lock);
|
||||||
|
tqDebug("vgId:%d meta-unlock", pMeta->vgId);
|
||||||
|
|
||||||
taosArrayDestroy(req.pNodeList);
|
taosArrayDestroy(req.pNodeList);
|
||||||
return rsp.code;
|
return rsp.code;
|
||||||
}
|
}
|
||||||
|
@ -1836,11 +1839,13 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
req.transId);
|
req.transId);
|
||||||
rsp.code = TSDB_CODE_SUCCESS;
|
rsp.code = TSDB_CODE_SUCCESS;
|
||||||
taosWUnLockLatch(&pMeta->lock);
|
taosWUnLockLatch(&pMeta->lock);
|
||||||
|
tqDebug("vgId:%d meta-unlock", pMeta->vgId);
|
||||||
taosArrayDestroy(req.pNodeList);
|
taosArrayDestroy(req.pNodeList);
|
||||||
return rsp.code;
|
return rsp.code;
|
||||||
}
|
}
|
||||||
|
|
||||||
taosWUnLockLatch(&pMeta->lock);
|
taosWUnLockLatch(&pMeta->lock);
|
||||||
|
tqDebug("vgId:%d meta-unlock", pMeta->vgId);
|
||||||
|
|
||||||
// the following two functions should not be executed within the scope of meta lock to avoid deadlock
|
// the following two functions should not be executed within the scope of meta lock to avoid deadlock
|
||||||
streamTaskUpdateEpsetInfo(pTask, req.pNodeList);
|
streamTaskUpdateEpsetInfo(pTask, req.pNodeList);
|
||||||
|
@ -1848,6 +1853,7 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
|
|
||||||
// continue after lock the meta again
|
// continue after lock the meta again
|
||||||
taosWLockLatch(&pMeta->lock);
|
taosWLockLatch(&pMeta->lock);
|
||||||
|
tqDebug("vgId:%d meta-wlock", pMeta->vgId);
|
||||||
|
|
||||||
SStreamTask** ppHTask = NULL;
|
SStreamTask** ppHTask = NULL;
|
||||||
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
|
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
|
||||||
|
@ -1898,15 +1904,19 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
tqDebug("vgId:%d closed tasks:%d, unclosed:%d, all tasks will be started when nodeEp update completed", vgId,
|
tqDebug("vgId:%d closed tasks:%d, unclosed:%d, all tasks will be started when nodeEp update completed", vgId,
|
||||||
updateTasks, (numOfTasks - updateTasks));
|
updateTasks, (numOfTasks - updateTasks));
|
||||||
taosWUnLockLatch(&pMeta->lock);
|
taosWUnLockLatch(&pMeta->lock);
|
||||||
|
tqDebug("vgId:%d meta-unlock", pMeta->vgId);
|
||||||
} else {
|
} else {
|
||||||
if (!pTq->pVnode->restored) {
|
if (!pTq->pVnode->restored) {
|
||||||
tqDebug("vgId:%d vnode restore not completed, not restart the tasks, clear the start after nodeUpdate flag", vgId);
|
tqDebug("vgId:%d vnode restore not completed, not restart the tasks, clear the start after nodeUpdate flag", vgId);
|
||||||
pMeta->startInfo.startAllTasksFlag = 0;
|
pMeta->startInfo.startAllTasksFlag = 0;
|
||||||
taosWUnLockLatch(&pMeta->lock);
|
taosWUnLockLatch(&pMeta->lock);
|
||||||
|
tqDebug("vgId:%d meta-unlock", pMeta->vgId);
|
||||||
} else {
|
} else {
|
||||||
tqDebug("vgId:%d tasks are all updated and stopped, restart them", vgId);
|
tqDebug("vgId:%d tasks are all updated and stopped, restart them", vgId);
|
||||||
terrno = 0;
|
terrno = 0;
|
||||||
|
|
||||||
taosWUnLockLatch(&pMeta->lock);
|
taosWUnLockLatch(&pMeta->lock);
|
||||||
|
tqDebug("vgId:%d meta-unlock", pMeta->vgId);
|
||||||
|
|
||||||
while (streamMetaTaskInTimer(pMeta)) {
|
while (streamMetaTaskInTimer(pMeta)) {
|
||||||
qDebug("vgId:%d some tasks in timer, wait for 100ms and recheck", pMeta->vgId);
|
qDebug("vgId:%d some tasks in timer, wait for 100ms and recheck", pMeta->vgId);
|
||||||
|
@ -1914,10 +1924,13 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
taosWLockLatch(&pMeta->lock);
|
taosWLockLatch(&pMeta->lock);
|
||||||
|
tqDebug("vgId:%d meta-wlock", pMeta->vgId);
|
||||||
|
|
||||||
int32_t code = streamMetaReopen(pMeta);
|
int32_t code = streamMetaReopen(pMeta);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
tqError("vgId:%d failed to reopen stream meta", vgId);
|
tqError("vgId:%d failed to reopen stream meta", vgId);
|
||||||
taosWUnLockLatch(&pMeta->lock);
|
taosWUnLockLatch(&pMeta->lock);
|
||||||
|
tqDebug("vgId:%d meta-unlock", pMeta->vgId);
|
||||||
taosArrayDestroy(req.pNodeList);
|
taosArrayDestroy(req.pNodeList);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -1925,6 +1938,7 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
if (streamMetaLoadAllTasks(pTq->pStreamMeta) < 0) {
|
if (streamMetaLoadAllTasks(pTq->pStreamMeta) < 0) {
|
||||||
tqError("vgId:%d failed to load stream tasks", vgId);
|
tqError("vgId:%d failed to load stream tasks", vgId);
|
||||||
taosWUnLockLatch(&pMeta->lock);
|
taosWUnLockLatch(&pMeta->lock);
|
||||||
|
tqDebug("vgId:%d meta-unlock", pMeta->vgId);
|
||||||
taosArrayDestroy(req.pNodeList);
|
taosArrayDestroy(req.pNodeList);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -1938,6 +1952,7 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
taosWUnLockLatch(&pMeta->lock);
|
taosWUnLockLatch(&pMeta->lock);
|
||||||
|
tqDebug("vgId:%d meta-unlock", pMeta->vgId);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -39,18 +39,16 @@ int32_t tqScanWal(STQ* pTq) {
|
||||||
|
|
||||||
if (shouldIdle) {
|
if (shouldIdle) {
|
||||||
taosWLockLatch(&pMeta->lock);
|
taosWLockLatch(&pMeta->lock);
|
||||||
|
|
||||||
int32_t times = (--pMeta->walScanCounter);
|
int32_t times = (--pMeta->walScanCounter);
|
||||||
ASSERT(pMeta->walScanCounter >= 0);
|
ASSERT(pMeta->walScanCounter >= 0);
|
||||||
|
|
||||||
if (pMeta->walScanCounter <= 0) {
|
|
||||||
taosWUnLockLatch(&pMeta->lock);
|
taosWUnLockLatch(&pMeta->lock);
|
||||||
|
|
||||||
|
if (times <= 0) {
|
||||||
break;
|
break;
|
||||||
}
|
} else {
|
||||||
|
|
||||||
taosWUnLockLatch(&pMeta->lock);
|
|
||||||
tqDebug("vgId:%d scan wal for stream tasks for %d times in %dms", vgId, times, SCAN_WAL_IDLE_DURATION);
|
tqDebug("vgId:%d scan wal for stream tasks for %d times in %dms", vgId, times, SCAN_WAL_IDLE_DURATION);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
taosMsleep(SCAN_WAL_IDLE_DURATION);
|
taosMsleep(SCAN_WAL_IDLE_DURATION);
|
||||||
}
|
}
|
||||||
|
|
|
@ -554,10 +554,14 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx)
|
||||||
walApplyVer(pVnode->pWal, commitIdx);
|
walApplyVer(pVnode->pWal, commitIdx);
|
||||||
pVnode->restored = true;
|
pVnode->restored = true;
|
||||||
|
|
||||||
taosWLockLatch(&pVnode->pTq->pStreamMeta->lock);
|
SStreamMeta* pMeta = pVnode->pTq->pStreamMeta;
|
||||||
if (pVnode->pTq->pStreamMeta->startInfo.startAllTasksFlag) {
|
taosWLockLatch(&pMeta->lock);
|
||||||
|
tqDebug("vgId:%d meta-wlock", pMeta->vgId);
|
||||||
|
|
||||||
|
if (pMeta->startInfo.startAllTasksFlag) {
|
||||||
vInfo("vgId:%d, sync restore finished, stream tasks will be launched by other thread", vgId);
|
vInfo("vgId:%d, sync restore finished, stream tasks will be launched by other thread", vgId);
|
||||||
taosWUnLockLatch(&pVnode->pTq->pStreamMeta->lock);
|
taosWUnLockLatch(&pMeta->lock);
|
||||||
|
tqDebug("vgId:%d meta-unlock", pMeta->vgId);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -574,7 +578,8 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx)
|
||||||
vInfo("vgId:%d, sync restore finished, not launch stream tasks since not leader", vgId);
|
vInfo("vgId:%d, sync restore finished, not launch stream tasks since not leader", vgId);
|
||||||
}
|
}
|
||||||
|
|
||||||
taosWUnLockLatch(&pVnode->pTq->pStreamMeta->lock);
|
taosWUnLockLatch(&pMeta->lock);
|
||||||
|
tqDebug("vgId:%d meta-unlock", pMeta->vgId);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void vnodeBecomeFollower(const SSyncFSM *pFsm) {
|
static void vnodeBecomeFollower(const SSyncFSM *pFsm) {
|
||||||
|
|
|
@ -185,6 +185,7 @@ int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBloc
|
||||||
{ // todo: remove this when the pipeline checkpoint generating is used.
|
{ // todo: remove this when the pipeline checkpoint generating is used.
|
||||||
SStreamMeta* pMeta = pTask->pMeta;
|
SStreamMeta* pMeta = pTask->pMeta;
|
||||||
taosWLockLatch(&pMeta->lock);
|
taosWLockLatch(&pMeta->lock);
|
||||||
|
stDebug("vgId:%d meta-wlock", pMeta->vgId);
|
||||||
|
|
||||||
if (pMeta->chkptNotReadyTasks == 0) {
|
if (pMeta->chkptNotReadyTasks == 0) {
|
||||||
pMeta->chkptNotReadyTasks = pMeta->numOfStreamTasks;
|
pMeta->chkptNotReadyTasks = pMeta->numOfStreamTasks;
|
||||||
|
@ -281,8 +282,10 @@ void streamTaskClearCheckInfo(SStreamTask* pTask) {
|
||||||
|
|
||||||
int32_t streamSaveAllTaskStatus(SStreamMeta* pMeta, int64_t checkpointId) {
|
int32_t streamSaveAllTaskStatus(SStreamMeta* pMeta, int64_t checkpointId) {
|
||||||
int32_t vgId = pMeta->vgId;
|
int32_t vgId = pMeta->vgId;
|
||||||
|
int32_t code = 0;
|
||||||
|
|
||||||
taosWLockLatch(&pMeta->lock);
|
taosWLockLatch(&pMeta->lock);
|
||||||
|
stDebug("vgId:%d meta-wlock", pMeta->vgId);
|
||||||
|
|
||||||
for (int32_t i = 0; i < taosArrayGetSize(pMeta->pTaskList); ++i) {
|
for (int32_t i = 0; i < taosArrayGetSize(pMeta->pTaskList); ++i) {
|
||||||
STaskId* pId = taosArrayGet(pMeta->pTaskList, i);
|
STaskId* pId = taosArrayGet(pMeta->pTaskList, i);
|
||||||
|
@ -304,10 +307,11 @@ int32_t streamSaveAllTaskStatus(SStreamMeta* pMeta, int64_t checkpointId) {
|
||||||
char* str = NULL;
|
char* str = NULL;
|
||||||
streamTaskGetStatus(p, &str);
|
streamTaskGetStatus(p, &str);
|
||||||
|
|
||||||
int32_t code = streamTaskHandleEvent(p->status.pSM, TASK_EVENT_CHECKPOINT_DONE);
|
code = streamTaskHandleEvent(p->status.pSM, TASK_EVENT_CHECKPOINT_DONE);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
stDebug("s-task:%s vgId:%d save task status failed, since handle event failed", p->id.idStr, vgId);
|
stDebug("s-task:%s vgId:%d save task status failed, since handle event failed", p->id.idStr, vgId);
|
||||||
taosWUnLockLatch(&pMeta->lock);
|
taosWUnLockLatch(&pMeta->lock);
|
||||||
|
stDebug("vgId:%d meta-unlock", pMeta->vgId);
|
||||||
return -1;
|
return -1;
|
||||||
} else { // save the task
|
} else { // save the task
|
||||||
streamMetaSaveTask(pMeta, p);
|
streamMetaSaveTask(pMeta, p);
|
||||||
|
@ -320,17 +324,17 @@ int32_t streamSaveAllTaskStatus(SStreamMeta* pMeta, int64_t checkpointId) {
|
||||||
str);
|
str);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (streamMetaCommit(pMeta) < 0) {
|
code = streamMetaCommit(pMeta);
|
||||||
taosWUnLockLatch(&pMeta->lock);
|
if (code < 0) {
|
||||||
stError("vgId:%d failed to commit stream meta after do checkpoint, checkpointId:%" PRId64 ", since %s", pMeta->vgId,
|
stError("vgId:%d failed to commit stream meta after do checkpoint, checkpointId:%" PRId64 ", since %s", pMeta->vgId,
|
||||||
checkpointId, terrstr());
|
checkpointId, terrstr());
|
||||||
return -1;
|
|
||||||
} else {
|
} else {
|
||||||
taosWUnLockLatch(&pMeta->lock);
|
|
||||||
stInfo("vgId:%d commit stream meta after do checkpoint, checkpointId:%" PRId64 " DONE", pMeta->vgId, checkpointId);
|
stInfo("vgId:%d commit stream meta after do checkpoint, checkpointId:%" PRId64 " DONE", pMeta->vgId, checkpointId);
|
||||||
}
|
}
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
taosWUnLockLatch(&pMeta->lock);
|
||||||
|
stDebug("vgId:%d meta-unlock", pMeta->vgId);
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) {
|
int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) {
|
||||||
|
|
|
@ -356,17 +356,9 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
|
||||||
// 4. free it and remove fill-history task from disk meta-store
|
// 4. free it and remove fill-history task from disk meta-store
|
||||||
streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &pTask->id);
|
streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &pTask->id);
|
||||||
|
|
||||||
// 5. clear the link between fill-history task and stream task info
|
// 5. save to disk
|
||||||
// CLEAR_RELATED_FILLHISTORY_TASK(pStreamTask);
|
|
||||||
|
|
||||||
// 6. save to disk
|
|
||||||
taosWLockLatch(&pMeta->lock);
|
taosWLockLatch(&pMeta->lock);
|
||||||
|
|
||||||
pStreamTask->status.taskStatus = streamTaskGetStatus(pStreamTask, NULL);
|
pStreamTask->status.taskStatus = streamTaskGetStatus(pStreamTask, NULL);
|
||||||
// streamMetaSaveTask(pMeta, pStreamTask);
|
|
||||||
// if (streamMetaCommit(pMeta) < 0) {
|
|
||||||
// persist to disk
|
|
||||||
// }
|
|
||||||
taosWUnLockLatch(&pMeta->lock);
|
taosWUnLockLatch(&pMeta->lock);
|
||||||
|
|
||||||
// 7. pause allowed.
|
// 7. pause allowed.
|
||||||
|
|
|
@ -492,6 +492,7 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t
|
||||||
|
|
||||||
// pre-delete operation
|
// pre-delete operation
|
||||||
taosWLockLatch(&pMeta->lock);
|
taosWLockLatch(&pMeta->lock);
|
||||||
|
stDebug("vgId:%d meta-wlock", pMeta->vgId);
|
||||||
|
|
||||||
STaskId id = {.streamId = streamId, .taskId = taskId};
|
STaskId id = {.streamId = streamId, .taskId = taskId};
|
||||||
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
|
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
|
||||||
|
@ -509,9 +510,11 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t
|
||||||
} else {
|
} else {
|
||||||
stDebug("vgId:%d failed to find the task:0x%x, it may be dropped already", pMeta->vgId, taskId);
|
stDebug("vgId:%d failed to find the task:0x%x, it may be dropped already", pMeta->vgId, taskId);
|
||||||
taosWUnLockLatch(&pMeta->lock);
|
taosWUnLockLatch(&pMeta->lock);
|
||||||
|
stDebug("vgId:%d meta-unlock", pMeta->vgId);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
taosWUnLockLatch(&pMeta->lock);
|
taosWUnLockLatch(&pMeta->lock);
|
||||||
|
stDebug("vgId:%d meta-unlock", pMeta->vgId);
|
||||||
|
|
||||||
stDebug("s-task:0x%x set task status:dropping and start to unregister it", taskId);
|
stDebug("s-task:0x%x set task status:dropping and start to unregister it", taskId);
|
||||||
|
|
||||||
|
@ -522,20 +525,25 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t
|
||||||
if (ppTask) {
|
if (ppTask) {
|
||||||
if ((*ppTask)->status.timerActive == 0) {
|
if ((*ppTask)->status.timerActive == 0) {
|
||||||
taosRUnLockLatch(&pMeta->lock);
|
taosRUnLockLatch(&pMeta->lock);
|
||||||
|
stDebug("vgId:%d meta-unlock", pMeta->vgId);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
taosMsleep(10);
|
taosMsleep(10);
|
||||||
stDebug("s-task:%s wait for quit from timer", (*ppTask)->id.idStr);
|
stDebug("s-task:%s wait for quit from timer", (*ppTask)->id.idStr);
|
||||||
taosRUnLockLatch(&pMeta->lock);
|
taosRUnLockLatch(&pMeta->lock);
|
||||||
|
stDebug("vgId:%d meta-unlock", pMeta->vgId);
|
||||||
} else {
|
} else {
|
||||||
taosRUnLockLatch(&pMeta->lock);
|
taosRUnLockLatch(&pMeta->lock);
|
||||||
|
stDebug("vgId:%d meta-unlock", pMeta->vgId);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// let's do delete of stream task
|
// let's do delete of stream task
|
||||||
taosWLockLatch(&pMeta->lock);
|
taosWLockLatch(&pMeta->lock);
|
||||||
|
stDebug("vgId:%d meta-wlock", pMeta->vgId);
|
||||||
|
|
||||||
ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
|
ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
|
||||||
if (ppTask) {
|
if (ppTask) {
|
||||||
pTask = *ppTask;
|
pTask = *ppTask;
|
||||||
|
@ -566,18 +574,16 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t
|
||||||
}
|
}
|
||||||
|
|
||||||
taosWUnLockLatch(&pMeta->lock);
|
taosWUnLockLatch(&pMeta->lock);
|
||||||
|
stDebug("vgId:%d meta-unlock", pMeta->vgId);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamMetaBegin(SStreamMeta* pMeta) {
|
int32_t streamMetaBegin(SStreamMeta* pMeta) {
|
||||||
taosWLockLatch(&pMeta->lock);
|
taosWLockLatch(&pMeta->lock);
|
||||||
if (tdbBegin(pMeta->db, &pMeta->txn, tdbDefaultMalloc, tdbDefaultFree, NULL,
|
int32_t code = tdbBegin(pMeta->db, &pMeta->txn, tdbDefaultMalloc, tdbDefaultFree, NULL,
|
||||||
TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) {
|
TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
|
||||||
taosWUnLockLatch(&pMeta->lock);
|
taosWUnLockLatch(&pMeta->lock);
|
||||||
return -1;
|
return code;
|
||||||
}
|
|
||||||
taosWUnLockLatch(&pMeta->lock);
|
|
||||||
return 0;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// todo add error log
|
// todo add error log
|
||||||
|
@ -1013,6 +1019,7 @@ void metaHbToMnode(void* param, void* tmrId) {
|
||||||
bool streamMetaTaskInTimer(SStreamMeta* pMeta) {
|
bool streamMetaTaskInTimer(SStreamMeta* pMeta) {
|
||||||
bool inTimer = false;
|
bool inTimer = false;
|
||||||
taosWLockLatch(&pMeta->lock);
|
taosWLockLatch(&pMeta->lock);
|
||||||
|
stDebug("vgId:%d meta-wlock", pMeta->vgId);
|
||||||
|
|
||||||
void* pIter = NULL;
|
void* pIter = NULL;
|
||||||
while (1) {
|
while (1) {
|
||||||
|
@ -1028,6 +1035,8 @@ bool streamMetaTaskInTimer(SStreamMeta* pMeta) {
|
||||||
}
|
}
|
||||||
|
|
||||||
taosWUnLockLatch(&pMeta->lock);
|
taosWUnLockLatch(&pMeta->lock);
|
||||||
|
stDebug("vgId:%d meta-unlock", pMeta->vgId);
|
||||||
|
|
||||||
return inTimer;
|
return inTimer;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1038,6 +1047,7 @@ void streamMetaNotifyClose(SStreamMeta* pMeta) {
|
||||||
(pMeta->role == NODE_ROLE_LEADER), pMeta->pHbInfo->hbStart, pMeta->pHbInfo->hbCount);
|
(pMeta->role == NODE_ROLE_LEADER), pMeta->pHbInfo->hbStart, pMeta->pHbInfo->hbCount);
|
||||||
|
|
||||||
taosWLockLatch(&pMeta->lock);
|
taosWLockLatch(&pMeta->lock);
|
||||||
|
stDebug("vgId:%d meta-wlock", pMeta->vgId);
|
||||||
|
|
||||||
void* pIter = NULL;
|
void* pIter = NULL;
|
||||||
while (1) {
|
while (1) {
|
||||||
|
@ -1052,6 +1062,7 @@ void streamMetaNotifyClose(SStreamMeta* pMeta) {
|
||||||
}
|
}
|
||||||
|
|
||||||
taosWUnLockLatch(&pMeta->lock);
|
taosWUnLockLatch(&pMeta->lock);
|
||||||
|
stDebug("vgId:%d meta-unlock", pMeta->vgId);
|
||||||
|
|
||||||
// wait for the stream meta hb function stopping
|
// wait for the stream meta hb function stopping
|
||||||
if (pMeta->role == NODE_ROLE_LEADER) {
|
if (pMeta->role == NODE_ROLE_LEADER) {
|
||||||
|
|
|
@ -625,6 +625,8 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) {
|
||||||
SStreamMeta* pMeta = pInfo->pMeta;
|
SStreamMeta* pMeta = pInfo->pMeta;
|
||||||
|
|
||||||
taosWLockLatch(&pMeta->lock);
|
taosWLockLatch(&pMeta->lock);
|
||||||
|
stDebug("vgId:%d meta-wlock", pMeta->vgId);
|
||||||
|
|
||||||
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &pInfo->id, sizeof(pInfo->id));
|
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &pInfo->id, sizeof(pInfo->id));
|
||||||
if (ppTask) {
|
if (ppTask) {
|
||||||
ASSERT((*ppTask)->status.timerActive >= 1);
|
ASSERT((*ppTask)->status.timerActive >= 1);
|
||||||
|
@ -638,10 +640,12 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) {
|
||||||
|
|
||||||
taosMemoryFree(pInfo);
|
taosMemoryFree(pInfo);
|
||||||
taosWUnLockLatch(&pMeta->lock);
|
taosWUnLockLatch(&pMeta->lock);
|
||||||
|
stDebug("vgId:%d meta-unlock", pMeta->vgId);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
taosWUnLockLatch(&pMeta->lock);
|
taosWUnLockLatch(&pMeta->lock);
|
||||||
|
stDebug("vgId:%d meta-unlock", pMeta->vgId);
|
||||||
|
|
||||||
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pInfo->id.streamId, pInfo->id.taskId);
|
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pInfo->id.streamId, pInfo->id.taskId);
|
||||||
if (pTask != NULL) {
|
if (pTask != NULL) {
|
||||||
|
@ -934,66 +938,6 @@ void streamTaskSetRangeStreamCalc(SStreamTask* pTask) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void streamTaskPause(SStreamTask* pTask, SStreamMeta* pMeta) {
|
void streamTaskPause(SStreamTask* pTask, SStreamMeta* pMeta) {
|
||||||
#if 0
|
|
||||||
int8_t status = pTask->status.taskStatus;
|
|
||||||
if (status == TASK_STATUS__DROPPING) {
|
|
||||||
stDebug("vgId:%d s-task:%s task already dropped, do nothing", pMeta->vgId, pTask->id.idStr);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
const char* str = streamGetTaskStatusStr(status);
|
|
||||||
if (status == TASK_STATUS__STOP || status == TASK_STATUS__PAUSE) {
|
|
||||||
stDebug("vgId:%d s-task:%s task already stopped/paused, status:%s, do nothing", pMeta->vgId, pTask->id.idStr, str);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if(pTask->info.taskLevel == TASK_LEVEL__SINK) {
|
|
||||||
int32_t num = atomic_add_fetch_32(&pMeta->numOfPausedTasks, 1);
|
|
||||||
stInfo("vgId:%d s-task:%s pause stream sink task. pause task num:%d", pMeta->vgId, pTask->id.idStr, num);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
while (!pTask->status.pauseAllowed || (pTask->status.taskStatus == TASK_STATUS__HALT)) {
|
|
||||||
status = pTask->status.taskStatus;
|
|
||||||
if (status == TASK_STATUS__DROPPING) {
|
|
||||||
stDebug("vgId:%d s-task:%s task already dropped, do nothing", pMeta->vgId, pTask->id.idStr);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (status == TASK_STATUS__STOP || status == TASK_STATUS__PAUSE) {
|
|
||||||
stDebug("vgId:%d s-task:%s task already stopped/paused, status:%s, do nothing", pMeta->vgId, pTask->id.idStr, str);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
//
|
|
||||||
// if (pTask->status.downstreamReady == 0) {
|
|
||||||
// ASSERT(pTask->execInfo.start == 0);
|
|
||||||
// stDebug("s-task:%s in check downstream procedure, abort and paused", pTask->id.idStr);
|
|
||||||
// break;
|
|
||||||
// }
|
|
||||||
|
|
||||||
const char* pStatus = streamGetTaskStatusStr(status);
|
|
||||||
stDebug("s-task:%s wait for the task can be paused, status:%s, vgId:%d", pTask->id.idStr, pStatus, pMeta->vgId);
|
|
||||||
taosMsleep(100);
|
|
||||||
}
|
|
||||||
|
|
||||||
// todo: use the task lock, stead of meta lock
|
|
||||||
taosWLockLatch(&pMeta->lock);
|
|
||||||
|
|
||||||
status = pTask->status.taskStatus;
|
|
||||||
if (status == TASK_STATUS__DROPPING || status == TASK_STATUS__STOP) {
|
|
||||||
taosWUnLockLatch(&pMeta->lock);
|
|
||||||
stDebug("vgId:%d s-task:%s task already dropped/stopped/paused, do nothing", pMeta->vgId, pTask->id.idStr);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
atomic_store_8(&pTask->status.keepTaskStatus, pTask->status.taskStatus);
|
|
||||||
atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__PAUSE);
|
|
||||||
int32_t num = atomic_add_fetch_32(&pMeta->numOfPausedTasks, 1);
|
|
||||||
stInfo("vgId:%d s-task:%s pause stream task. pause task num:%d", pMeta->vgId, pTask->id.idStr, num);
|
|
||||||
taosWUnLockLatch(&pMeta->lock);
|
|
||||||
|
|
||||||
#endif
|
|
||||||
|
|
||||||
streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_PAUSE);
|
streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_PAUSE);
|
||||||
|
|
||||||
int32_t num = atomic_add_fetch_32(&pMeta->numOfPausedTasks, 1);
|
int32_t num = atomic_add_fetch_32(&pMeta->numOfPausedTasks, 1);
|
||||||
|
@ -1029,19 +973,6 @@ void streamTaskResume(SStreamTask* pTask) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// todo fix race condition
|
|
||||||
void streamTaskDisablePause(SStreamTask* pTask) {
|
|
||||||
// pre-condition check
|
|
||||||
// const char* id = pTask->id.idStr;
|
|
||||||
// while (pTask->status.taskStatus == TASK_STATUS__PAUSE) {
|
|
||||||
// stDebug("s-task:%s already in pause, wait for pause being cancelled, and set pause disabled, recheck in 100ms", id);
|
|
||||||
// taosMsleep(100);
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// stDebug("s-task:%s disable task pause", id);
|
|
||||||
// pTask->status.pauseAllowed = 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
void streamTaskEnablePause(SStreamTask* pTask) {
|
void streamTaskEnablePause(SStreamTask* pTask) {
|
||||||
stDebug("s-task:%s enable task pause", pTask->id.idStr);
|
stDebug("s-task:%s enable task pause", pTask->id.idStr);
|
||||||
pTask->status.pauseAllowed = 1;
|
pTask->status.pauseAllowed = 1;
|
||||||
|
@ -1051,6 +982,7 @@ int32_t streamMetaUpdateTaskReadyInfo(SStreamTask* pTask) {
|
||||||
SStreamMeta* pMeta = pTask->pMeta;
|
SStreamMeta* pMeta = pTask->pMeta;
|
||||||
|
|
||||||
taosWLockLatch(&pMeta->lock);
|
taosWLockLatch(&pMeta->lock);
|
||||||
|
stDebug("vgId:%d meta-wlock", pMeta->vgId);
|
||||||
|
|
||||||
STaskId id = streamTaskExtractKey(pTask);
|
STaskId id = streamTaskExtractKey(pTask);
|
||||||
taosHashPut(pMeta->startInfo.pReadyTaskSet, &id, sizeof(id), NULL, 0);
|
taosHashPut(pMeta->startInfo.pReadyTaskSet, &id, sizeof(id), NULL, 0);
|
||||||
|
@ -1072,5 +1004,6 @@ int32_t streamMetaUpdateTaskReadyInfo(SStreamTask* pTask) {
|
||||||
}
|
}
|
||||||
|
|
||||||
taosWUnLockLatch(&pMeta->lock);
|
taosWUnLockLatch(&pMeta->lock);
|
||||||
|
stDebug("vgId:%d meta-unlock", pMeta->vgId);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue