fix(stream): check status before scan history.
This commit is contained in:
parent
66697ee188
commit
bea8fe42db
|
@ -954,6 +954,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
SStreamTask* pTask = NULL;
|
SStreamTask* pTask = NULL;
|
||||||
SStreamTask* pStreamTask = NULL;
|
SStreamTask* pStreamTask = NULL;
|
||||||
|
char* pStatus = NULL;
|
||||||
|
|
||||||
code = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId, &pTask);
|
code = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId, &pTask);
|
||||||
if (pTask == NULL) {
|
if (pTask == NULL) {
|
||||||
|
@ -967,9 +968,11 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
streamMutexLock(&pTask->lock);
|
streamMutexLock(&pTask->lock);
|
||||||
|
|
||||||
SStreamTaskState s = streamTaskGetStatus(pTask);
|
SStreamTaskState s = streamTaskGetStatus(pTask);
|
||||||
if ((s.state != TASK_STATUS__SCAN_HISTORY) || (pTask->status.downstreamReady == 1)) {
|
pStatus = s.name;
|
||||||
tqError("s-task:%s vgId:%d status:%s downstreamReady:%d not ready for scan-history data, quit", id, pMeta->vgId,
|
|
||||||
s.name, pTask->status.downstreamReady);
|
if ((s.state != TASK_STATUS__SCAN_HISTORY) || (pTask->status.downstreamReady == 0)) {
|
||||||
|
tqError("s-task:%s vgId:%d status:%s downstreamReady:%d not allowed/ready for scan-history data, quit", id,
|
||||||
|
pMeta->vgId, s.name, pTask->status.downstreamReady);
|
||||||
|
|
||||||
streamMutexUnlock(&pTask->lock);
|
streamMutexUnlock(&pTask->lock);
|
||||||
streamMetaReleaseTask(pMeta, pTask);
|
streamMetaReleaseTask(pMeta, pTask);
|
||||||
|
|
Loading…
Reference in New Issue