fix(stream): seek to right place to start wal read.
This commit is contained in:
parent
49aff4571b
commit
649d26ce72
|
@ -1583,9 +1583,10 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) {
|
||||||
|
|
||||||
if (pStream == NULL) {
|
if (pStream == NULL) {
|
||||||
if (pauseReq.igNotExists) {
|
if (pauseReq.igNotExists) {
|
||||||
mInfo("stream:%s, not exist, if exist is set", pauseReq.name);
|
mInfo("stream:%s, not exist, not pause stream", pauseReq.name);
|
||||||
return 0;
|
return 0;
|
||||||
} else {
|
} else {
|
||||||
|
mError("stream:%s not exist, failed to pause stream", pauseReq.name);
|
||||||
terrno = TSDB_CODE_MND_STREAM_NOT_EXIST;
|
terrno = TSDB_CODE_MND_STREAM_NOT_EXIST;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -1671,10 +1672,11 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) {
|
||||||
|
|
||||||
if (pStream == NULL) {
|
if (pStream == NULL) {
|
||||||
if (pauseReq.igNotExists) {
|
if (pauseReq.igNotExists) {
|
||||||
mInfo("stream:%s, not exist, if exist is set", pauseReq.name);
|
mInfo("stream:%s not exist, not resume stream", pauseReq.name);
|
||||||
sdbRelease(pMnode->pSdb, pStream);
|
sdbRelease(pMnode->pSdb, pStream);
|
||||||
return 0;
|
return 0;
|
||||||
} else {
|
} else {
|
||||||
|
mError("stream:%s not exist, failed to resume stream", pauseReq.name);
|
||||||
terrno = TSDB_CODE_MND_STREAM_NOT_EXIST;
|
terrno = TSDB_CODE_MND_STREAM_NOT_EXIST;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
|
@ -223,14 +223,6 @@ int32_t setWalReaderStartOffset(SStreamTask* pTask, int32_t vgId) {
|
||||||
// append the data for the stream
|
// append the data for the stream
|
||||||
tqDebug("vgId:%d s-task:%s wal reader initial seek to ver:%" PRId64, vgId, pTask->id.idStr,
|
tqDebug("vgId:%d s-task:%s wal reader initial seek to ver:%" PRId64, vgId, pTask->id.idStr,
|
||||||
pTask->chkInfo.nextProcessVer);
|
pTask->chkInfo.nextProcessVer);
|
||||||
} else if (currentVer != pTask->chkInfo.nextProcessVer) {
|
|
||||||
int32_t code = walReaderSeekVer(pTask->exec.pWalReader, pTask->chkInfo.nextProcessVer);
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
tqDebug("vgId:%d s-task:%s wal reader seek back to ver:%" PRId64, vgId, pTask->id.idStr,
|
|
||||||
pTask->chkInfo.nextProcessVer);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -312,17 +304,17 @@ bool doPutDataIntoInputQ(SStreamTask* pTask, int64_t maxVer, int32_t* numOfItems
|
||||||
const char* id = pTask->id.idStr;
|
const char* id = pTask->id.idStr;
|
||||||
int32_t numOfNewItems = 0;
|
int32_t numOfNewItems = 0;
|
||||||
|
|
||||||
while(1) {
|
while (1) {
|
||||||
if ((pTask->info.fillHistory == 1) && pTask->status.appendTranstateBlock) {
|
if ((pTask->info.fillHistory == 1) && pTask->status.appendTranstateBlock) {
|
||||||
*numOfItems += numOfNewItems;
|
*numOfItems += numOfNewItems;
|
||||||
return numOfNewItems > 0;
|
return numOfNewItems > 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
SStreamQueueItem* pItem = NULL;
|
SStreamQueueItem* pItem = NULL;
|
||||||
int32_t code = extractMsgFromWal(pTask->exec.pWalReader, (void**)&pItem, maxVer, id);
|
int32_t code = extractMsgFromWal(pTask->exec.pWalReader, (void**)&pItem, maxVer, id);
|
||||||
if (code != TSDB_CODE_SUCCESS || pItem == NULL) { // failed, continue
|
if (code != TSDB_CODE_SUCCESS || pItem == NULL) { // failed, continue
|
||||||
int64_t currentVer = walReaderGetCurrentVer(pTask->exec.pWalReader);
|
int64_t currentVer = walReaderGetCurrentVer(pTask->exec.pWalReader);
|
||||||
bool itemInFillhistory = handleFillhistoryScanComplete(pTask, currentVer);
|
bool itemInFillhistory = handleFillhistoryScanComplete(pTask, currentVer);
|
||||||
if (itemInFillhistory) {
|
if (itemInFillhistory) {
|
||||||
numOfNewItems += 1;
|
numOfNewItems += 1;
|
||||||
}
|
}
|
||||||
|
@ -342,7 +334,9 @@ bool doPutDataIntoInputQ(SStreamTask* pTask, int64_t maxVer, int32_t* numOfItems
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
tqError("s-task:%s append input queue failed, code: too many items, ver:%" PRId64, id, pTask->chkInfo.nextProcessVer);
|
walReaderSeekVer(pTask->exec.pWalReader, pTask->chkInfo.nextProcessVer);
|
||||||
|
tqError("s-task:%s append input queue failed, code:too many items, ver:%" PRId64, id,
|
||||||
|
pTask->chkInfo.nextProcessVer);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue