Merge pull request #20967 from taosdata/fix/liaohj_main
fix(stream): update the reference count value to be int32
This commit is contained in:
commit
915e5acb87
|
@ -345,7 +345,7 @@ typedef struct SStreamMeta {
|
||||||
FTaskExpand* expandFunc;
|
FTaskExpand* expandFunc;
|
||||||
int32_t vgId;
|
int32_t vgId;
|
||||||
SRWLatch lock;
|
SRWLatch lock;
|
||||||
int8_t walScan;
|
int32_t walScan;
|
||||||
} SStreamMeta;
|
} SStreamMeta;
|
||||||
|
|
||||||
int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo);
|
int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo);
|
||||||
|
|
|
@ -27,31 +27,37 @@ int tqStreamTasksScanWal(STQ* pTq) {
|
||||||
int64_t st = taosGetTimestampMs();
|
int64_t st = taosGetTimestampMs();
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
tqInfo("vgId:%d continue check if data in wal are available", vgId);
|
int32_t scan = pMeta->walScan;
|
||||||
|
tqDebug("vgId:%d continue check if data in wal are available, scan:%d", vgId, scan);
|
||||||
|
|
||||||
// check all restore tasks
|
// check all restore tasks
|
||||||
bool allFull = true;
|
bool shouldIdle = true;
|
||||||
streamTaskReplayWal(pTq->pStreamMeta, pTq->pOffsetStore, &allFull);
|
streamTaskReplayWal(pTq->pStreamMeta, pTq->pOffsetStore, &shouldIdle);
|
||||||
|
|
||||||
int32_t times = 0;
|
int32_t times = 0;
|
||||||
|
|
||||||
if (allFull) {
|
if (shouldIdle) {
|
||||||
taosWLockLatch(&pMeta->lock);
|
taosWLockLatch(&pMeta->lock);
|
||||||
pMeta->walScan -= 1;
|
pMeta->walScan -= 1;
|
||||||
times = pMeta->walScan;
|
times = pMeta->walScan;
|
||||||
|
|
||||||
|
ASSERT(pMeta->walScan >= 0);
|
||||||
|
|
||||||
if (pMeta->walScan <= 0) {
|
if (pMeta->walScan <= 0) {
|
||||||
taosWUnLockLatch(&pMeta->lock);
|
taosWUnLockLatch(&pMeta->lock);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
taosWUnLockLatch(&pMeta->lock);
|
taosWUnLockLatch(&pMeta->lock);
|
||||||
tqInfo("vgId:%d scan wal for stream tasks for %d times", vgId, times);
|
tqDebug("vgId:%d scan wal for stream tasks for %d times", vgId, times);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
double el = (taosGetTimestampMs() - st) / 1000.0;
|
int64_t el = (taosGetTimestampMs() - st);
|
||||||
tqInfo("vgId:%d scan wal for stream tasks completed, elapsed time:%.2f sec", vgId, el);
|
tqDebug("vgId:%d scan wal for stream tasks completed, elapsed time:%"PRId64" ms", vgId, el);
|
||||||
|
|
||||||
|
// restore wal scan flag
|
||||||
|
// atomic_store_8(&pTq->pStreamMeta->walScan, 0);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -96,8 +102,8 @@ int32_t streamTaskReplayWal(SStreamMeta* pStreamMeta, STqOffsetStore* pOffsetSto
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
int8_t status = pTask->status.taskStatus;
|
if (pTask->status.taskStatus == TASK_STATUS__RECOVER_PREPARE ||
|
||||||
if (status == TASK_STATUS__RECOVER_PREPARE || status == TASK_STATUS__WAIT_DOWNSTREAM) {
|
pTask->status.taskStatus == TASK_STATUS__WAIT_DOWNSTREAM) {
|
||||||
tqDebug("s-task:%s skip push data, not ready for processing, status %d", pTask->id.idStr,
|
tqDebug("s-task:%s skip push data, not ready for processing, status %d", pTask->id.idStr,
|
||||||
pTask->status.taskStatus);
|
pTask->status.taskStatus);
|
||||||
continue;
|
continue;
|
||||||
|
|
Loading…
Reference in New Issue