refactor: do some internal refactor.

This commit is contained in:
Haojun Liao 2023-05-31 09:19:05 +08:00
parent eda06081ff
commit e237b4ac39
1 changed files with 8 additions and 5 deletions

View File

@ -978,17 +978,20 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms
tDecoderClear(&decoder);
SStreamMeta* pStreamMeta = pTq->pStreamMeta;
// 2.save task, use the newest commit version as the initial start version of stream task.
taosWLockLatch(&pTq->pStreamMeta->lock);
code = streamMetaAddDeployedTask(pTq->pStreamMeta, sversion, pTask);
int32_t numOfTasks = streamMetaGetNumOfTasks(pTq->pStreamMeta);
taosWLockLatch(&pStreamMeta->lock);
code = streamMetaAddDeployedTask(pStreamMeta, sversion, pTask);
int32_t numOfTasks = streamMetaGetNumOfTasks(pStreamMeta);
if (code < 0) {
tqError("vgId:%d failed to add s-task:%s, total:%d", vgId, pTask->id.idStr, numOfTasks);
taosWUnLockLatch(&pTq->pStreamMeta->lock);
taosWUnLockLatch(&pStreamMeta->lock);
return -1;
}
taosWUnLockLatch(&pTq->pStreamMeta->lock);
taosWUnLockLatch(&pStreamMeta->lock);
// 3.go through recover steps to fill history
if (pTask->fillHistory) {