fix(stream): opt task scan performance.

This commit is contained in:
Haojun Liao 2023-05-05 19:33:30 +08:00
parent 8b12d4d3da
commit 8958aabe4b
6 changed files with 49 additions and 30 deletions

View File

@ -340,6 +340,7 @@ typedef struct SStreamMeta {
TTB* pTaskDb; TTB* pTaskDb;
TTB* pCheckpointDb; TTB* pCheckpointDb;
SHashObj* pTasks; SHashObj* pTasks;
SArray* pTaskList; // SArray<task_id*>
void* ahandle; void* ahandle;
TXN* txn; TXN* txn;
FTaskExpand* expandFunc; FTaskExpand* expandFunc;

View File

@ -153,11 +153,15 @@ int32_t sndProcessTaskDeployReq(SSnode *pSnode, char *msg, int32_t msgLen) {
ASSERT(pTask->taskLevel == TASK_LEVEL__AGG); ASSERT(pTask->taskLevel == TASK_LEVEL__AGG);
// 2.save task // 2.save task
taosWLockLatch(&pSnode->pMeta->lock);
code = streamMetaAddDeployedTask(pSnode->pMeta, -1, pTask); code = streamMetaAddDeployedTask(pSnode->pMeta, -1, pTask);
if (code < 0) { if (code < 0) {
taosWUnLockLatch(&pSnode->pMeta->lock);
return -1; return -1;
} }
taosWUnLockLatch(&pSnode->pMeta->lock);
// 3.go through recover steps to fill history // 3.go through recover steps to fill history
if (pTask->fillHistory) { if (pTask->fillHistory) {
streamSetParamForRecover(pTask); streamSetParamForRecover(pTask);

View File

@ -781,13 +781,17 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms
tDecoderClear(&decoder); tDecoderClear(&decoder);
// 2.save task, use the newest commit version as the initial start version of stream task. // 2.save task, use the newest commit version as the initial start version of stream task.
taosWLockLatch(&pTq->pStreamMeta->lock);
code = streamMetaAddDeployedTask(pTq->pStreamMeta, sversion, pTask); code = streamMetaAddDeployedTask(pTq->pStreamMeta, sversion, pTask);
if (code < 0) { if (code < 0) {
tqError("vgId:%d failed to add s-task:%s, total:%d", TD_VID(pTq->pVnode), pTask->id.idStr, tqError("vgId:%d failed to add s-task:%s, total:%d", TD_VID(pTq->pVnode), pTask->id.idStr,
streamMetaGetNumOfTasks(pTq->pStreamMeta)); streamMetaGetNumOfTasks(pTq->pStreamMeta));
taosWUnLockLatch(&pTq->pStreamMeta->lock);
return -1; return -1;
} }
taosWUnLockLatch(&pTq->pStreamMeta->lock);
// 3.go through recover steps to fill history // 3.go through recover steps to fill history
if (pTask->fillHistory) { if (pTask->fillHistory) {
streamTaskCheckDownstream(pTask, sversion); streamTaskCheckDownstream(pTask, sversion);
@ -1324,12 +1328,11 @@ int32_t tqCheckLogInWal(STQ* pTq, int64_t sversion) { return sversion <= pTq->wa
int32_t tqStartStreamTasks(STQ* pTq) { int32_t tqStartStreamTasks(STQ* pTq) {
int32_t vgId = TD_VID(pTq->pVnode); int32_t vgId = TD_VID(pTq->pVnode);
SStreamMeta* pMeta = pTq->pStreamMeta; SStreamMeta* pMeta = pTq->pStreamMeta;
taosWLockLatch(&pMeta->lock); taosWLockLatch(&pMeta->lock);
int32_t numOfTasks = taosHashGetSize(pTq->pStreamMeta->pTasks); int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
if (numOfTasks == 0) { if (numOfTasks == 0) {
tqInfo("vgId:%d no stream tasks exists", vgId); tqInfo("vgId:%d no stream tasks exists", vgId);
taosWUnLockLatch(&pTq->pStreamMeta->lock); taosWUnLockLatch(&pTq->pStreamMeta->lock);

View File

@ -262,14 +262,15 @@ int32_t tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t v
// taosWUnLockLatch(&pTq->lock); // taosWUnLockLatch(&pTq->lock);
} }
tqDebug("handle submit, restore:%d, size:%d", pTq->pVnode->restored, (int)taosHashGetSize(pTq->pStreamMeta->pTasks)); int32_t numOfTasks = streamMetaGetNumOfTasks(pTq->pStreamMeta);
tqDebug("handle submit, restore:%d, size:%d", pTq->pVnode->restored, numOfTasks);
// push data for stream processing: // push data for stream processing:
// 1. the vnode has already been restored. // 1. the vnode has already been restored.
// 2. the vnode should be the leader. // 2. the vnode should be the leader.
// 3. the stream is not suspended yet. // 3. the stream is not suspended yet.
if (!tsDisableStream && vnodeIsRoleLeader(pTq->pVnode) && pTq->pVnode->restored) { if (!tsDisableStream && vnodeIsRoleLeader(pTq->pVnode) && pTq->pVnode->restored) {
if (taosHashGetSize(pTq->pStreamMeta->pTasks) == 0) { if (numOfTasks == 0) {
return 0; return 0;
} }

View File

@ -57,42 +57,28 @@ int32_t tqStreamTasksScanWal(STQ* pTq) {
return 0; return 0;
} }
static SArray* extractTaskIdList(SStreamMeta* pStreamMeta, int32_t numOfTasks) {
SArray* pTaskIdList = taosArrayInit(numOfTasks, sizeof(int32_t));
void* pIter = NULL;
taosWLockLatch(&pStreamMeta->lock);
while(1) {
pIter = taosHashIterate(pStreamMeta->pTasks, pIter);
if (pIter == NULL) {
break;
}
SStreamTask* pTask = *(SStreamTask**)pIter;
taosArrayPush(pTaskIdList, &pTask->id.taskId);
}
taosWUnLockLatch(&pStreamMeta->lock);
return pTaskIdList;
}
int32_t createStreamRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) { int32_t createStreamRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) {
*pScanIdle = true; *pScanIdle = true;
bool noNewDataInWal = true; bool noNewDataInWal = true;
int32_t vgId = pStreamMeta->vgId; int32_t vgId = pStreamMeta->vgId;
int32_t numOfTasks = taosHashGetSize(pStreamMeta->pTasks); int32_t numOfTasks = taosArrayGetSize(pStreamMeta->pTaskList);
if (numOfTasks == 0) { if (numOfTasks == 0) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
SArray* pTaskList = NULL;
taosWLockLatch(&pStreamMeta->lock);
pTaskList = taosArrayDup(pStreamMeta->pTaskList, NULL);
taosWUnLockLatch(&pStreamMeta->lock);
tqDebug("vgId:%d start to check wal to extract new submit block for %d tasks", vgId, numOfTasks); tqDebug("vgId:%d start to check wal to extract new submit block for %d tasks", vgId, numOfTasks);
SArray* pTaskIdList = extractTaskIdList(pStreamMeta, numOfTasks);
// update the new task number // update the new task number
numOfTasks = taosArrayGetSize(pTaskIdList); numOfTasks = taosArrayGetSize(pTaskList);
for (int32_t i = 0; i < numOfTasks; ++i) { for (int32_t i = 0; i < numOfTasks; ++i) {
int32_t* pTaskId = taosArrayGet(pTaskIdList, i); int32_t* pTaskId = taosArrayGet(pTaskList, i);
SStreamTask* pTask = streamMetaAcquireTask(pStreamMeta, *pTaskId); SStreamTask* pTask = streamMetaAcquireTask(pStreamMeta, *pTaskId);
if (pTask == NULL) { if (pTask == NULL) {
continue; continue;
@ -166,7 +152,7 @@ int32_t createStreamRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) {
*pScanIdle = true; *pScanIdle = true;
} }
taosArrayDestroy(pTaskIdList); taosArrayDestroy(pTaskList);
return 0; return 0;
} }

View File

@ -57,6 +57,13 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
goto _err; goto _err;
} }
// task list
pMeta->pTaskList = taosArrayInit(4, sizeof(int32_t));
if (pMeta->pTaskList == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
if (streamMetaBegin(pMeta) < 0) { if (streamMetaBegin(pMeta) < 0) {
goto _err; goto _err;
} }
@ -70,6 +77,7 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
_err: _err:
taosMemoryFree(pMeta->path); taosMemoryFree(pMeta->path);
if (pMeta->pTasks) taosHashCleanup(pMeta->pTasks); if (pMeta->pTasks) taosHashCleanup(pMeta->pTasks);
if (pMeta->pTaskList) taosArrayDestroy(pMeta->pTaskList);
if (pMeta->pTaskDb) tdbTbClose(pMeta->pTaskDb); if (pMeta->pTaskDb) tdbTbClose(pMeta->pTaskDb);
if (pMeta->pCheckpointDb) tdbTbClose(pMeta->pCheckpointDb); if (pMeta->pCheckpointDb) tdbTbClose(pMeta->pCheckpointDb);
if (pMeta->db) tdbClose(pMeta->db); if (pMeta->db) tdbClose(pMeta->db);
@ -100,6 +108,7 @@ void streamMetaClose(SStreamMeta* pMeta) {
} }
taosHashCleanup(pMeta->pTasks); taosHashCleanup(pMeta->pTasks);
pMeta->pTaskList = taosArrayDestroy(pMeta->pTaskList);
taosMemoryFree(pMeta->path); taosMemoryFree(pMeta->path);
taosMemoryFree(pMeta); taosMemoryFree(pMeta);
} }
@ -180,11 +189,15 @@ int32_t streamMetaAddDeployedTask(SStreamMeta* pMeta, int64_t ver, SStreamTask*
} }
taosHashPut(pMeta->pTasks, &pTask->id.taskId, sizeof(int32_t), &pTask, POINTER_BYTES); taosHashPut(pMeta->pTasks, &pTask->id.taskId, sizeof(int32_t), &pTask, POINTER_BYTES);
taosArrayPush(pMeta->pTaskList, &pTask->id.taskId);
return 0; return 0;
} }
int32_t streamMetaGetNumOfTasks(const SStreamMeta* pMeta) { int32_t streamMetaGetNumOfTasks(const SStreamMeta* pMeta) {
return (int32_t) taosHashGetSize(pMeta->pTasks); size_t size = taosHashGetSize(pMeta->pTasks);
ASSERT(taosArrayGetSize(pMeta->pTaskList) == taosHashGetSize(pMeta->pTasks));
return (int32_t) size;
} }
SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int32_t taskId) { SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int32_t taskId) {
@ -224,6 +237,15 @@ void streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId) {
atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__STOP); atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__STOP);
int32_t num = taosArrayGetSize(pMeta->pTaskList);
for(int32_t i = 0; i < num; ++i) {
int32_t* pTaskId = taosArrayGet(pMeta->pTaskList, i);
if (*pTaskId == taskId) {
taosArrayRemove(pMeta->pTaskList, i);
break;
}
}
streamMetaReleaseTask(pMeta, pTask); streamMetaReleaseTask(pMeta, pTask);
taosWUnLockLatch(&pMeta->lock); taosWUnLockLatch(&pMeta->lock);
} }
@ -308,6 +330,8 @@ int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) {
return -1; return -1;
} }
taosArrayPush(pMeta->pTaskList, &pTask->id.taskId);
if (pTask->fillHistory) { if (pTask->fillHistory) {
pTask->status.taskStatus = TASK_STATUS__WAIT_DOWNSTREAM; pTask->status.taskStatus = TASK_STATUS__WAIT_DOWNSTREAM;
streamTaskCheckDownstream(pTask, ver); streamTaskCheckDownstream(pTask, ver);