fix(stream): fix the race condition during create new stream tasks.

This commit is contained in:
Haojun Liao 2023-04-20 23:21:18 +08:00
parent 65fded4a28
commit 5e6c06e253
8 changed files with 56 additions and 131 deletions

View File

@ -273,6 +273,7 @@ typedef struct SStreamId {
typedef struct SCheckpointInfo { typedef struct SCheckpointInfo {
int64_t id; int64_t id;
int64_t version; // offset in WAL int64_t version; // offset in WAL
int64_t currentVer;// current offset in WAL, not serialize it
} SCheckpointInfo; } SCheckpointInfo;
typedef struct SStreamStatus { typedef struct SStreamStatus {
@ -537,6 +538,7 @@ void streamTaskInputFail(SStreamTask* pTask);
int32_t streamTryExec(SStreamTask* pTask); int32_t streamTryExec(SStreamTask* pTask);
int32_t streamSchedExec(SStreamTask* pTask); int32_t streamSchedExec(SStreamTask* pTask);
int32_t streamTaskOutput(SStreamTask* pTask, SStreamDataBlock* pBlock); int32_t streamTaskOutput(SStreamTask* pTask, SStreamDataBlock* pBlock);
bool streamTaskShouldStop(const SStreamStatus* pStatus);
int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz); int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz);

View File

@ -121,7 +121,6 @@ struct STQ {
TTB* pExecStore; TTB* pExecStore;
TTB* pCheckStore; TTB* pCheckStore;
SStreamMeta* pStreamMeta; SStreamMeta* pStreamMeta;
bool closing;
}; };
typedef struct { typedef struct {
@ -183,14 +182,8 @@ int32_t tqStreamTasksScanWal(STQ* pTq);
char* createStreamTaskIdStr(int64_t streamId, int32_t taskId); char* createStreamTaskIdStr(int64_t streamId, int32_t taskId);
void createStreamTaskOffsetKey(char* dst, uint64_t streamId, uint32_t taskId); void createStreamTaskOffsetKey(char* dst, uint64_t streamId, uint32_t taskId);
int32_t tqAddInputBlockNLaunchTask(SStreamTask* pTask, SStreamQueueItem* pQueueItem, int64_t ver); int32_t tqAddInputBlockNLaunchTask(SStreamTask* pTask, SStreamQueueItem* pQueueItem, int64_t ver);
int32_t launchTaskForWalBlock(SStreamTask* pTask, SFetchRet* pRet, STqOffset* pOffset);
int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg); int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg);
void doSaveTaskOffset(STqOffsetStore* pOffsetStore, const char* pKey, int64_t ver);
void saveOffsetForAllTasks(STQ* pTq, int64_t ver);
void initOffsetForAllRestoreTasks(STQ* pTq);
int32_t transferToWalReadTask(SStreamMeta* pStreamMeta, SArray* pTaskList);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

View File

@ -97,7 +97,6 @@ STQ* tqOpen(const char* path, SVnode* pVnode) {
return NULL; return NULL;
} }
pTq->closing = false;
pTq->path = taosStrdup(path); pTq->path = taosStrdup(path);
pTq->pVnode = pVnode; pTq->pVnode = pVnode;
pTq->walLogLastVer = pVnode->pWal->vers.lastVer; pTq->walLogLastVer = pVnode->pWal->vers.lastVer;
@ -157,7 +156,6 @@ void tqClose(STQ* pTq) {
void tqNotifyClose(STQ* pTq) { void tqNotifyClose(STQ* pTq) {
if (pTq != NULL) { if (pTq != NULL) {
pTq->closing = true;
taosWLockLatch(&pTq->pStreamMeta->lock); taosWLockLatch(&pTq->pStreamMeta->lock);
void* pIter = NULL; void* pIter = NULL;
@ -169,8 +167,12 @@ void tqNotifyClose(STQ* pTq) {
SStreamTask* pTask = *(SStreamTask**)pIter; SStreamTask* pTask = *(SStreamTask**)pIter;
tqDebug("vgId:%d s-task:%s set dropping flag", pTq->pStreamMeta->vgId, pTask->id.idStr); tqDebug("vgId:%d s-task:%s set dropping flag", pTq->pStreamMeta->vgId, pTask->id.idStr);
pTask->status.taskStatus = TASK_STATUS__DROPPING; pTask->status.taskStatus = TASK_STATUS__STOP;
int64_t st = taosGetTimestampMs();
qKillTask(pTask->exec.pExecutor, TSDB_CODE_SUCCESS); qKillTask(pTask->exec.pExecutor, TSDB_CODE_SUCCESS);
int64_t el = taosGetTimestampMs() - st;
tqDebug("vgId:%d s-task:%s is closed in %" PRId64 "ms", pTq->pStreamMeta->vgId, pTask->id.idStr, el);
} }
taosWUnLockLatch(&pTq->pStreamMeta->lock); taosWUnLockLatch(&pTq->pStreamMeta->lock);
@ -596,6 +598,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
pTask->pMsgCb = &pTq->pVnode->msgCb; pTask->pMsgCb = &pTq->pVnode->msgCb;
pTask->pMeta = pTq->pStreamMeta; pTask->pMeta = pTq->pStreamMeta;
pTask->chkInfo.version = ver; pTask->chkInfo.version = ver;
pTask->chkInfo.currentVer = ver;
// expand executor // expand executor
if (pTask->fillHistory) { if (pTask->fillHistory) {
@ -989,14 +992,21 @@ int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) {
int32_t* pRef = taosMemoryMalloc(sizeof(int32_t)); int32_t* pRef = taosMemoryMalloc(sizeof(int32_t));
*pRef = 1; *pRef = 1;
taosWLockLatch(&pTq->pStreamMeta->lock);
void* pIter = NULL; void* pIter = NULL;
while (1) { while (1) {
pIter = taosHashIterate(pTq->pStreamMeta->pTasks, pIter); pIter = taosHashIterate(pTq->pStreamMeta->pTasks, pIter);
if (pIter == NULL) break; if (pIter == NULL) {
SStreamTask* pTask = *(SStreamTask**)pIter; break;
if (pTask->taskLevel != TASK_LEVEL__SOURCE) continue; }
qDebug("delete req enqueue stream task: %d, ver: %" PRId64, pTask->id.taskId, ver); SStreamTask* pTask = *(SStreamTask**)pIter;
if (pTask->taskLevel != TASK_LEVEL__SOURCE) {
continue;
}
qDebug("s-task:%s delete req enqueue, ver: %" PRId64, pTask->id.idStr, ver);
if (!failed) { if (!failed) {
SStreamRefDataBlock* pRefBlock = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0); SStreamRefDataBlock* pRefBlock = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0);
@ -1006,7 +1016,7 @@ int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) {
atomic_add_fetch_32(pRefBlock->dataRef, 1); atomic_add_fetch_32(pRefBlock->dataRef, 1);
if (tAppendDataToInputQueue(pTask, (SStreamQueueItem*)pRefBlock) < 0) { if (tAppendDataToInputQueue(pTask, (SStreamQueueItem*)pRefBlock) < 0) {
qError("stream task input del failed, task id %d", pTask->id.taskId); qError("s-task:%s stream task input del failed", pTask->id.idStr);
atomic_sub_fetch_32(pRef, 1); atomic_sub_fetch_32(pRef, 1);
taosFreeQitem(pRefBlock); taosFreeQitem(pRefBlock);
@ -1014,7 +1024,7 @@ int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) {
} }
if (streamSchedExec(pTask) < 0) { if (streamSchedExec(pTask) < 0) {
qError("stream task launch failed, task id %d", pTask->id.taskId); qError("s-task:%s stream task launch failed", pTask->id.idStr);
continue; continue;
} }
@ -1023,8 +1033,9 @@ int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) {
} }
} }
taosWUnLockLatch(&pTq->pStreamMeta->lock);
int32_t ref = atomic_sub_fetch_32(pRef, 1); int32_t ref = atomic_sub_fetch_32(pRef, 1);
/*A(ref >= 0);*/
if (ref == 0) { if (ref == 0) {
blockDataDestroy(pDelBlock); blockDataDestroy(pDelBlock);
taosMemoryFree(pRef); taosMemoryFree(pRef);
@ -1055,23 +1066,9 @@ int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) {
} }
blockDataDestroy(pDelBlock); blockDataDestroy(pDelBlock);
#endif #endif
return 0; return 0;
} }
static int32_t addSubmitBlockNLaunchTask(STqOffsetStore* pOffsetStore, SStreamTask* pTask, SStreamDataSubmit2* pSubmit,
const char* key, int64_t ver) {
doSaveTaskOffset(pOffsetStore, key, ver);
int32_t code = tqAddInputBlockNLaunchTask(pTask, (SStreamQueueItem*)pSubmit, ver);
// remove the offset, if all functions are completed successfully.
if (code == TSDB_CODE_SUCCESS) {
tqOffsetDelete(pOffsetStore, key);
}
return code;
}
int32_t tqProcessSubmitReq(STQ* pTq, SPackedData submit) { int32_t tqProcessSubmitReq(STQ* pTq, SPackedData submit) {
#if 0 #if 0
void* pIter = NULL; void* pIter = NULL;
@ -1333,8 +1330,6 @@ int32_t tqStartStreamTasks(STQ* pTq) {
} }
tqDebug("vgId:%d start wal scan stream tasks, tasks:%d", vgId, numOfTasks); tqDebug("vgId:%d start wal scan stream tasks, tasks:%d", vgId, numOfTasks);
initOffsetForAllRestoreTasks(pTq);
pRunReq->head.vgId = vgId; pRunReq->head.vgId = vgId;
pRunReq->streamId = 0; pRunReq->streamId = 0;
pRunReq->taskId = WAL_READ_TASKS_ID; pRunReq->taskId = WAL_READ_TASKS_ID;

View File

@ -1023,6 +1023,7 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) {
} }
// update the table list handle for each stream scanner/wal reader // update the table list handle for each stream scanner/wal reader
taosWLockLatch(&pTq->pStreamMeta->lock);
while (1) { while (1) {
pIter = taosHashIterate(pTq->pStreamMeta->pTasks, pIter); pIter = taosHashIterate(pTq->pStreamMeta->pTasks, pIter);
if (pIter == NULL) { if (pIter == NULL) {
@ -1039,5 +1040,7 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) {
} }
} }
taosWUnLockLatch(&pTq->pStreamMeta->lock);
return 0; return 0;
} }

View File

@ -15,12 +15,12 @@
#include "tq.h" #include "tq.h"
static int32_t doCreateReqsByScanWal(SStreamMeta* pStreamMeta, STqOffsetStore* pOffsetStore, bool* pScanIdle); static int32_t doCreateReqsByScanWal(SStreamMeta* pStreamMeta, bool* pScanIdle);
// this function should be executed by stream threads. // this function should be executed by stream threads.
// there is a case that the WAL increases more fast than the restore procedure, and this restore procedure // there is a case that the WAL increases more fast than the restore procedure, and this restore procedure
// will not stop eventually. // will not stop eventually.
int tqStreamTasksScanWal(STQ* pTq) { int32_t tqStreamTasksScanWal(STQ* pTq) {
int32_t vgId = TD_VID(pTq->pVnode); int32_t vgId = TD_VID(pTq->pVnode);
SStreamMeta* pMeta = pTq->pStreamMeta; SStreamMeta* pMeta = pTq->pStreamMeta;
int64_t st = taosGetTimestampMs(); int64_t st = taosGetTimestampMs();
@ -31,7 +31,7 @@ int tqStreamTasksScanWal(STQ* pTq) {
// check all restore tasks // check all restore tasks
bool shouldIdle = true; bool shouldIdle = true;
doCreateReqsByScanWal(pTq->pStreamMeta, pTq->pOffsetStore, &shouldIdle); doCreateReqsByScanWal(pTq->pStreamMeta, &shouldIdle);
int32_t times = 0; int32_t times = 0;
@ -76,7 +76,7 @@ static SArray* extractTaskIdList(SStreamMeta* pStreamMeta, int32_t numOfTasks) {
return pTaskIdList; return pTaskIdList;
} }
int32_t doCreateReqsByScanWal(SStreamMeta* pStreamMeta, STqOffsetStore* pOffsetStore, bool* pScanIdle) { int32_t doCreateReqsByScanWal(SStreamMeta* pStreamMeta, bool* pScanIdle) {
*pScanIdle = true; *pScanIdle = true;
bool noNewDataInWal = true; bool noNewDataInWal = true;
int32_t vgId = pStreamMeta->vgId; int32_t vgId = pStreamMeta->vgId;
@ -99,12 +99,14 @@ int32_t doCreateReqsByScanWal(SStreamMeta* pStreamMeta, STqOffsetStore* pOffsetS
} }
int32_t status = pTask->status.taskStatus; int32_t status = pTask->status.taskStatus;
if ((pTask->taskLevel != TASK_LEVEL__SOURCE) || (status == TASK_STATUS__DROPPING)) { if (pTask->taskLevel != TASK_LEVEL__SOURCE) {
tqDebug("s-task:%s not source task, no need to start", pTask->id.idStr);
streamMetaReleaseTask(pStreamMeta, pTask); streamMetaReleaseTask(pStreamMeta, pTask);
continue; continue;
} }
if (status == TASK_STATUS__RECOVER_PREPARE || status == TASK_STATUS__WAIT_DOWNSTREAM) { if (streamTaskShouldStop(&pTask->status) || status == TASK_STATUS__RECOVER_PREPARE ||
status == TASK_STATUS__WAIT_DOWNSTREAM) {
tqDebug("s-task:%s skip push data, not ready for processing, status %d", pTask->id.idStr, status); tqDebug("s-task:%s skip push data, not ready for processing, status %d", pTask->id.idStr, status);
streamMetaReleaseTask(pStreamMeta, pTask); streamMetaReleaseTask(pStreamMeta, pTask);
continue; continue;
@ -122,19 +124,15 @@ int32_t doCreateReqsByScanWal(SStreamMeta* pStreamMeta, STqOffsetStore* pOffsetS
*pScanIdle = false; *pScanIdle = false;
// check if offset value exists
STqOffset* pOffset = tqOffsetRead(pOffsetStore, key);
ASSERT(pOffset != NULL);
// seek the stored version and extract data from WAL // seek the stored version and extract data from WAL
int32_t code = walReadSeekVer(pTask->exec.pWalReader, pOffset->val.version); int32_t code = walReadSeekVer(pTask->exec.pWalReader, pTask->chkInfo.currentVer);
if (code != TSDB_CODE_SUCCESS) { // no data in wal, quit if (code != TSDB_CODE_SUCCESS) { // no data in wal, quit
streamMetaReleaseTask(pStreamMeta, pTask); streamMetaReleaseTask(pStreamMeta, pTask);
continue; continue;
} }
// append the data for the stream // append the data for the stream
tqDebug("vgId:%d wal reader seek to ver:%" PRId64 " %s", vgId, pOffset->val.version, pTask->id.idStr); tqDebug("vgId:%d wal reader seek to ver:%" PRId64 " %s", vgId, pTask->chkInfo.currentVer, pTask->id.idStr);
SPackedData packData = {0}; SPackedData packData = {0};
code = extractSubmitMsgFromWal(pTask->exec.pWalReader, &packData); code = extractSubmitMsgFromWal(pTask->exec.pWalReader, &packData);
@ -156,9 +154,9 @@ int32_t doCreateReqsByScanWal(SStreamMeta* pStreamMeta, STqOffsetStore* pOffsetS
tqDebug("s-task:%s submit data extracted from WAL", pTask->id.idStr); tqDebug("s-task:%s submit data extracted from WAL", pTask->id.idStr);
code = tqAddInputBlockNLaunchTask(pTask, (SStreamQueueItem*)p, packData.ver); code = tqAddInputBlockNLaunchTask(pTask, (SStreamQueueItem*)p, packData.ver);
if (code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
pOffset->val.version = walReaderGetCurrentVer(pTask->exec.pWalReader); pTask->chkInfo.currentVer = walReaderGetCurrentVer(pTask->exec.pWalReader);
tqDebug("s-task:%s set the ver:%" PRId64 " from WALReader after extract block from WAL", pTask->id.idStr, tqDebug("s-task:%s set the ver:%" PRId64 " from WALReader after extract block from WAL", pTask->id.idStr,
pOffset->val.version); pTask->chkInfo.currentVer);
} else { } else {
// do nothing // do nothing
} }

View File

@ -55,76 +55,6 @@ int32_t tqAddInputBlockNLaunchTask(SStreamTask* pTask, SStreamQueueItem* pQueueI
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
void initOffsetForAllRestoreTasks(STQ* pTq) {
void* pIter = NULL;
while(1) {
pIter = taosHashIterate(pTq->pStreamMeta->pTasks, pIter);
if (pIter == NULL) {
break;
}
SStreamTask* pTask = *(SStreamTask**)pIter;
if (pTask->taskLevel != TASK_LEVEL__SOURCE) {
tqDebug("s-task:%s not source task, not register offset", pTask->id.idStr);
continue;
}
if (pTask->status.taskStatus == TASK_STATUS__RECOVER_PREPARE || pTask->status.taskStatus == TASK_STATUS__WAIT_DOWNSTREAM) {
tqDebug("s-task:%s no need to record the offset, status %d", pTask->id.idStr, pTask->status.taskStatus);
continue;
}
char key[128] = {0};
createStreamTaskOffsetKey(key, pTask->id.streamId, pTask->id.taskId);
STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, key);
if (pOffset == NULL) {
doSaveTaskOffset(pTq->pOffsetStore, key, pTask->chkInfo.version);
}
}
}
void saveOffsetForAllTasks(STQ* pTq, int64_t ver) {
void* pIter = NULL;
while(1) {
pIter = taosHashIterate(pTq->pStreamMeta->pTasks, pIter);
if (pIter == NULL) {
break;
}
SStreamTask* pTask = *(SStreamTask**)pIter;
if (pTask->taskLevel != TASK_LEVEL__SOURCE) {
continue;
}
if (pTask->status.taskStatus == TASK_STATUS__RECOVER_PREPARE || pTask->status.taskStatus == TASK_STATUS__WAIT_DOWNSTREAM) {
tqDebug("s-task:%s skip push data, not ready for processing, status %d", pTask->id.idStr,
pTask->status.taskStatus);
continue;
}
char key[128] = {0};
createStreamTaskOffsetKey(key, pTask->id.streamId, pTask->id.taskId);
STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, key);
if (pOffset == NULL) {
doSaveTaskOffset(pTq->pOffsetStore, key, ver);
}
}
}
void doSaveTaskOffset(STqOffsetStore* pOffsetStore, const char* pKey, int64_t ver) {
STqOffset offset = {0};
tqOffsetResetToLog(&offset.val, ver);
tstrncpy(offset.subKey, pKey, tListLen(offset.subKey));
// keep the offset info in the offset store
tqOffsetWrite(pOffsetStore, &offset);
}
static int32_t tqInitDataRsp(SMqDataRsp* pRsp, const SMqPollReq* pReq, int8_t subType) { static int32_t tqInitDataRsp(SMqDataRsp* pRsp, const SMqPollReq* pReq, int8_t subType) {
pRsp->reqOffset = pReq->reqOffset; pRsp->reqOffset = pReq->reqOffset;

View File

@ -17,6 +17,11 @@
#define STREAM_EXEC_MAX_BATCH_NUM 100 #define STREAM_EXEC_MAX_BATCH_NUM 100
bool streamTaskShouldStop(const SStreamStatus* pStatus) {
int32_t status = atomic_load_8((int8_t*) &pStatus->taskStatus);
return (status == TASK_STATUS__STOP) || (status == TASK_STATUS__DROPPING);
}
static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray* pRes) { static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray* pRes) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
void* pExecutor = pTask->exec.pExecutor; void* pExecutor = pTask->exec.pExecutor;
@ -66,7 +71,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray*
// pExecutor // pExecutor
while (1) { while (1) {
if (pTask->status.taskStatus == TASK_STATUS__DROPPING) { if (streamTaskShouldStop(&pTask->status)) {
return 0; return 0;
} }
@ -134,7 +139,7 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) {
int32_t batchCnt = 0; int32_t batchCnt = 0;
while (1) { while (1) {
if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING) { if (streamTaskShouldStop(&pTask->status)) {
taosArrayDestroy(pRes); taosArrayDestroy(pRes);
return 0; return 0;
} }
@ -270,7 +275,7 @@ int32_t streamExecForAll(SStreamTask* pTask) {
} }
} }
if (pTask->status.taskStatus == TASK_STATUS__DROPPING) { if (streamTaskShouldStop(&pTask->status)) {
if (pInput) { if (pInput) {
streamFreeQitem(pInput); streamFreeQitem(pInput);
} }
@ -368,7 +373,7 @@ int32_t streamTryExec(SStreamTask* pTask) {
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
qDebug("s-task:%s exec completed", pTask->id.idStr); qDebug("s-task:%s exec completed", pTask->id.idStr);
if (!taosQueueEmpty(pTask->inputQueue->queue) && (pTask->status.taskStatus != TASK_STATUS__DROPPING)) { if (!taosQueueEmpty(pTask->inputQueue->queue) && (!streamTaskShouldStop(&pTask->status))) {
streamSchedExec(pTask); streamSchedExec(pTask);
} }
} }

View File

@ -191,10 +191,12 @@ SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int32_t taskId) {
taosRLockLatch(&pMeta->lock); taosRLockLatch(&pMeta->lock);
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t)); SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t));
if (ppTask != NULL && (atomic_load_8(&((*ppTask)->status.taskStatus)) != TASK_STATUS__DROPPING)) { if (ppTask != NULL) {
atomic_add_fetch_32(&(*ppTask)->refCnt, 1); if (streamTaskShouldStop(&(*ppTask)->status)) {
taosRUnLockLatch(&pMeta->lock); atomic_add_fetch_32(&(*ppTask)->refCnt, 1);
return *ppTask; taosRUnLockLatch(&pMeta->lock);
return *ppTask;
}
} }
taosRUnLockLatch(&pMeta->lock); taosRUnLockLatch(&pMeta->lock);
@ -205,7 +207,7 @@ void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask) {
int32_t left = atomic_sub_fetch_32(&pTask->refCnt, 1); int32_t left = atomic_sub_fetch_32(&pTask->refCnt, 1);
ASSERT(left >= 0); ASSERT(left >= 0);
if (left == 0) { if (left == 0) {
ASSERT(atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING); ASSERT(streamTaskShouldStop(&pTask->status));
tFreeStreamTask(pTask); tFreeStreamTask(pTask);
} }
} }
@ -216,11 +218,8 @@ void streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId) {
SStreamTask* pTask = *ppTask; SStreamTask* pTask = *ppTask;
taosHashRemove(pMeta->pTasks, &taskId, sizeof(int32_t)); taosHashRemove(pMeta->pTasks, &taskId, sizeof(int32_t));
tdbTbDelete(pMeta->pTaskDb, &taskId, sizeof(int32_t), pMeta->txn); tdbTbDelete(pMeta->pTaskDb, &taskId, sizeof(int32_t), pMeta->txn);
/*if (pTask->timer) {
* taosTmrStop(pTask->timer);*/ atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__STOP);
/*pTask->timer = NULL;*/
/*}*/
atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__DROPPING);
taosWLockLatch(&pMeta->lock); taosWLockLatch(&pMeta->lock);
streamMetaReleaseTask(pMeta, pTask); streamMetaReleaseTask(pMeta, pTask);