fix:after restarting taosd, stream does not work.
This commit is contained in:
parent
f4f3b886fc
commit
8224d49702
|
@ -109,6 +109,15 @@ int32_t createStreamRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) {
|
||||||
// seek the stored version and extract data from WAL
|
// seek the stored version and extract data from WAL
|
||||||
int32_t code = walReadSeekVer(pTask->exec.pWalReader, pTask->chkInfo.currentVer);
|
int32_t code = walReadSeekVer(pTask->exec.pWalReader, pTask->chkInfo.currentVer);
|
||||||
if (code != TSDB_CODE_SUCCESS) { // no data in wal, quit
|
if (code != TSDB_CODE_SUCCESS) { // no data in wal, quit
|
||||||
|
SWal *pWal = pTask->exec.pWalReader->pWal;
|
||||||
|
if (pTask->chkInfo.currentVer < pWal->vers.firstVer ) {
|
||||||
|
pTask->chkInfo.currentVer = pWal->vers.firstVer;
|
||||||
|
code = walReadSeekVer(pTask->exec.pWalReader, pTask->chkInfo.currentVer);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
streamMetaReleaseTask(pStreamMeta, pTask);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
streamMetaReleaseTask(pStreamMeta, pTask);
|
streamMetaReleaseTask(pStreamMeta, pTask);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
|
@ -188,6 +188,11 @@ int32_t streamMetaAddDeployedTask(SStreamMeta* pMeta, int64_t ver, SStreamTask*
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (streamMetaCommit(pMeta) < 0) {
|
||||||
|
tFreeStreamTask(pTask);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
taosHashPut(pMeta->pTasks, &pTask->id.taskId, sizeof(int32_t), &pTask, POINTER_BYTES);
|
taosHashPut(pMeta->pTasks, &pTask->id.taskId, sizeof(int32_t), &pTask, POINTER_BYTES);
|
||||||
taosArrayPush(pMeta->pTaskList, &pTask->id.taskId);
|
taosArrayPush(pMeta->pTaskList, &pTask->id.taskId);
|
||||||
return 0;
|
return 0;
|
||||||
|
|
Loading…
Reference in New Issue