fix(stream): add lock during check wal to create new stream task.
This commit is contained in:
parent
742b5ee08c
commit
b60b1796f7
|
@ -15,8 +15,7 @@
|
|||
|
||||
#include "tq.h"
|
||||
|
||||
static int32_t streamTaskReplayWal(SStreamMeta* pStreamMeta, STqOffsetStore* pOffsetStore, bool* pScanIdle);
|
||||
static int32_t transferToNormalTask(SStreamMeta* pStreamMeta, SArray* pTaskList);
|
||||
static int32_t doCreateReqsByScanWal(SStreamMeta* pStreamMeta, STqOffsetStore* pOffsetStore, bool* pScanIdle);
|
||||
|
||||
// this function should be executed by stream threads.
|
||||
// there is a case that the WAL increases more fast than the restore procedure, and this restore procedure
|
||||
|
@ -32,7 +31,7 @@ int tqStreamTasksScanWal(STQ* pTq) {
|
|||
|
||||
// check all restore tasks
|
||||
bool shouldIdle = true;
|
||||
streamTaskReplayWal(pTq->pStreamMeta, pTq->pOffsetStore, &shouldIdle);
|
||||
doCreateReqsByScanWal(pTq->pStreamMeta, pTq->pOffsetStore, &shouldIdle);
|
||||
|
||||
int32_t times = 0;
|
||||
|
||||
|
@ -55,50 +54,50 @@ int tqStreamTasksScanWal(STQ* pTq) {
|
|||
|
||||
int64_t el = (taosGetTimestampMs() - st);
|
||||
tqDebug("vgId:%d scan wal for stream tasks completed, elapsed time:%"PRId64" ms", vgId, el);
|
||||
|
||||
// restore wal scan flag
|
||||
// atomic_store_8(&pTq->pStreamMeta->walScan, 0);
|
||||
return 0;
|
||||
}
|
||||
|
||||
//int32_t transferToNormalTask(SStreamMeta* pStreamMeta, SArray* pTaskList) {
|
||||
// int32_t numOfTask = taosArrayGetSize(pTaskList);
|
||||
// if (numOfTask <= 0) {
|
||||
// return TSDB_CODE_SUCCESS;
|
||||
// }
|
||||
//
|
||||
// // todo: add lock
|
||||
// for (int32_t i = 0; i < numOfTask; ++i) {
|
||||
// SStreamTask* pTask = taosArrayGetP(pTaskList, i);
|
||||
// tqDebug("vgId:%d transfer s-task:%s state restore -> ready, checkpoint:%" PRId64 " checkpoint id:%" PRId64,
|
||||
// pStreamMeta->vgId, pTask->id.idStr, pTask->chkInfo.version, pTask->chkInfo.id);
|
||||
// taosHashRemove(pStreamMeta->pWalReadTasks, &pTask->id.taskId, sizeof(pTask->id.taskId));
|
||||
//
|
||||
// // NOTE: do not change the following order
|
||||
// atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__NORMAL);
|
||||
// taosHashPut(pStreamMeta->pTasks, &pTask->id.taskId, sizeof(pTask->id.taskId), &pTask, POINTER_BYTES);
|
||||
// }
|
||||
//
|
||||
// return TSDB_CODE_SUCCESS;
|
||||
//}
|
||||
|
||||
int32_t streamTaskReplayWal(SStreamMeta* pStreamMeta, STqOffsetStore* pOffsetStore, bool* pScanIdle) {
|
||||
static SArray* extractTaskIdList(SStreamMeta* pStreamMeta, int32_t numOfTasks) {
|
||||
SArray* pTaskIdList = taosArrayInit(numOfTasks, sizeof(int32_t));
|
||||
void* pIter = NULL;
|
||||
int32_t vgId = pStreamMeta->vgId;
|
||||
|
||||
*pScanIdle = true;
|
||||
|
||||
bool allWalChecked = true;
|
||||
tqDebug("vgId:%d start to check wal to extract new submit block", vgId);
|
||||
|
||||
while (1) {
|
||||
taosWLockLatch(&pStreamMeta->lock);
|
||||
while(1) {
|
||||
pIter = taosHashIterate(pStreamMeta->pTasks, pIter);
|
||||
if (pIter == NULL) {
|
||||
break;
|
||||
}
|
||||
|
||||
SStreamTask* pTask = *(SStreamTask**)pIter;
|
||||
taosArrayPush(pTaskIdList, &pTask->id.taskId);
|
||||
}
|
||||
|
||||
taosWUnLockLatch(&pStreamMeta->lock);
|
||||
return pTaskIdList;
|
||||
}
|
||||
|
||||
int32_t doCreateReqsByScanWal(SStreamMeta* pStreamMeta, STqOffsetStore* pOffsetStore, bool* pScanIdle) {
|
||||
*pScanIdle = true;
|
||||
bool noNewDataInWal = true;
|
||||
int32_t vgId = pStreamMeta->vgId;
|
||||
|
||||
int32_t numOfTasks = taosHashGetSize(pStreamMeta->pTasks);
|
||||
if (numOfTasks == 0) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
tqDebug("vgId:%d start to check wal to extract new submit block for %d tasks", vgId, numOfTasks);
|
||||
SArray* pTaskIdList = extractTaskIdList(pStreamMeta, numOfTasks);
|
||||
|
||||
for (int32_t i = 0; i < numOfTasks; ++i) {
|
||||
int32_t* pTaskId = taosArrayGet(pTaskIdList, i);
|
||||
SStreamTask* pTask = streamMetaAcquireTask(pStreamMeta, *pTaskId);
|
||||
if (pTask == NULL) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (pTask->taskLevel != TASK_LEVEL__SOURCE) {
|
||||
streamMetaReleaseTask(pStreamMeta, pTask);
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -106,6 +105,7 @@ int32_t streamTaskReplayWal(SStreamMeta* pStreamMeta, STqOffsetStore* pOffsetSto
|
|||
pTask->status.taskStatus == TASK_STATUS__WAIT_DOWNSTREAM) {
|
||||
tqDebug("s-task:%s skip push data, not ready for processing, status %d", pTask->id.idStr,
|
||||
pTask->status.taskStatus);
|
||||
streamMetaReleaseTask(pStreamMeta, pTask);
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -115,6 +115,7 @@ int32_t streamTaskReplayWal(SStreamMeta* pStreamMeta, STqOffsetStore* pOffsetSto
|
|||
|
||||
if (tInputQueueIsFull(pTask)) {
|
||||
tqDebug("vgId:%d s-task:%s input queue is full, do nothing", vgId, pTask->id.idStr);
|
||||
streamMetaReleaseTask(pStreamMeta, pTask);
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -127,6 +128,7 @@ int32_t streamTaskReplayWal(SStreamMeta* pStreamMeta, STqOffsetStore* pOffsetSto
|
|||
// seek the stored version and extract data from WAL
|
||||
int32_t code = walReadSeekVer(pTask->exec.pWalReader, pOffset->val.version);
|
||||
if (code != TSDB_CODE_SUCCESS) { // no data in wal, quit
|
||||
streamMetaReleaseTask(pStreamMeta, pTask);
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -136,6 +138,7 @@ int32_t streamTaskReplayWal(SStreamMeta* pStreamMeta, STqOffsetStore* pOffsetSto
|
|||
SPackedData packData = {0};
|
||||
code = extractSubmitMsgFromWal(pTask->exec.pWalReader, &packData);
|
||||
if (code != TSDB_CODE_SUCCESS) { // failed, continue
|
||||
streamMetaReleaseTask(pStreamMeta, pTask);
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -143,10 +146,11 @@ int32_t streamTaskReplayWal(SStreamMeta* pStreamMeta, STqOffsetStore* pOffsetSto
|
|||
if (p == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
tqError("%s failed to create data submit for stream since out of memory", pTask->id.idStr);
|
||||
streamMetaReleaseTask(pStreamMeta, pTask);
|
||||
continue;
|
||||
}
|
||||
|
||||
allWalChecked = false;
|
||||
noNewDataInWal = false;
|
||||
|
||||
tqDebug("s-task:%s submit data extracted from WAL", pTask->id.idStr);
|
||||
code = tqAddInputBlockNLaunchTask(pTask, (SStreamQueueItem*)p, packData.ver);
|
||||
|
@ -160,11 +164,15 @@ int32_t streamTaskReplayWal(SStreamMeta* pStreamMeta, STqOffsetStore* pOffsetSto
|
|||
|
||||
streamDataSubmitDestroy(p);
|
||||
taosFreeQitem(p);
|
||||
streamMetaReleaseTask(pStreamMeta, pTask);
|
||||
}
|
||||
|
||||
if (allWalChecked) {
|
||||
// all wal are checked, and no new data available in wal.
|
||||
if (noNewDataInWal) {
|
||||
*pScanIdle = true;
|
||||
}
|
||||
|
||||
taosArrayDestroy(pTaskIdList);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
|
|
@ -84,11 +84,6 @@ void streamMetaClose(SStreamMeta* pMeta) {
|
|||
tdbClose(pMeta->db);
|
||||
|
||||
void* pIter = NULL;
|
||||
// while(pMeta->walScan) {
|
||||
// qDebug("wait stream daemon quit");
|
||||
// taosMsleep(100);
|
||||
// }
|
||||
|
||||
while (1) {
|
||||
pIter = taosHashIterate(pMeta->pTasks, pIter);
|
||||
if (pIter == NULL) {
|
||||
|
@ -102,7 +97,6 @@ void streamMetaClose(SStreamMeta* pMeta) {
|
|||
}
|
||||
|
||||
tFreeStreamTask(pTask);
|
||||
/*streamMetaReleaseTask(pMeta, pTask);*/
|
||||
}
|
||||
|
||||
taosHashCleanup(pMeta->pTasks);
|
||||
|
|
Loading…
Reference in New Issue