fix(stream): disable stream task when no tasks exist.

This commit is contained in:
Haojun Liao 2023-04-16 23:07:54 +08:00
parent 1350af5267
commit be90d2c511
2 changed files with 8 additions and 4 deletions

View File

@ -346,7 +346,6 @@ typedef struct SStreamMeta {
int32_t vgId;
SRWLatch lock;
int8_t walScan;
bool quit;
} SStreamMeta;
int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo);

View File

@ -111,7 +111,7 @@ STQ* tqOpen(const char* path, SVnode* pVnode) {
pTq->pCheckInfo = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
taosHashSetFreeFp(pTq->pCheckInfo, (FDelete)tDeleteSTqCheckInfo);
tqInitialize(pVnode->pTq);
tqInitialize(pTq);
return pTq;
}
@ -1281,6 +1281,13 @@ int32_t tqStartStreamTasks(STQ* pTq) {
SStreamMeta* pMeta = pTq->pStreamMeta;
taosWLockLatch(&pMeta->lock);
int32_t numOfTasks = taosHashGetSize(pTq->pStreamMeta->pTasks);
if (numOfTasks == 0) {
tqInfo("vgId:%d no stream tasks exists", vgId);
taosWUnLockLatch(&pTq->pStreamMeta->lock);
return 0;
}
pMeta->walScan += 1;
if (pMeta->walScan > 1) {
@ -1297,8 +1304,6 @@ int32_t tqStartStreamTasks(STQ* pTq) {
return -1;
}
int32_t numOfTasks = taosHashGetSize(pTq->pStreamMeta->pTasks);
tqInfo("vgId:%d start wal scan stream tasks, tasks:%d", vgId, numOfTasks);
initOffsetForAllRestoreTasks(pTq);