fix(stream): fix error in start stream tasks.

This commit is contained in:
Haojun Liao 2023-04-21 09:37:16 +08:00
parent 471abd9160
commit 6c641cff39
4 changed files with 5 additions and 7 deletions

View File

@ -1016,8 +1016,6 @@ int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) {
atomic_add_fetch_32(pRefBlock->dataRef, 1);
if (tAppendDataToInputQueue(pTask, (SStreamQueueItem*)pRefBlock) < 0) {
qError("s-task:%s stream task append submit into input queue failed", pTask->id.idStr);
atomic_sub_fetch_32(pRef, 1);
taosFreeQitem(pRefBlock);
continue;

View File

@ -15,7 +15,7 @@
#include "tq.h"
static int32_t doCreateReqsByScanWal(SStreamMeta* pStreamMeta, bool* pScanIdle);
static int32_t createStreamRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle);
// this function should be executed by stream threads.
// there is a case that the WAL increases more fast than the restore procedure, and this restore procedure
@ -31,7 +31,7 @@ int32_t tqStreamTasksScanWal(STQ* pTq) {
// check all restore tasks
bool shouldIdle = true;
doCreateReqsByScanWal(pTq->pStreamMeta, &shouldIdle);
createStreamRunReq(pTq->pStreamMeta, &shouldIdle);
int32_t times = 0;
@ -76,7 +76,7 @@ static SArray* extractTaskIdList(SStreamMeta* pStreamMeta, int32_t numOfTasks) {
return pTaskIdList;
}
int32_t doCreateReqsByScanWal(SStreamMeta* pStreamMeta, bool* pScanIdle) {
int32_t createStreamRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) {
*pScanIdle = true;
bool noNewDataInWal = true;
int32_t vgId = pStreamMeta->vgId;

View File

@ -192,7 +192,7 @@ SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int32_t taskId) {
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t));
if (ppTask != NULL) {
if (streamTaskShouldStop(&(*ppTask)->status)) {
if (!streamTaskShouldStop(&(*ppTask)->status)) {
atomic_add_fetch_32(&(*ppTask)->refCnt, 1);
taosRUnLockLatch(&pMeta->lock);
return *ppTask;

View File

@ -37,7 +37,7 @@ if $loop_count == 20 then
endi
if $rows != 4 then
print =====rows=$rows, expect 4
print =====rows=$rows expect 4
goto loop0
endi