Merge pull request #24107 from taosdata/fix/3_liaohj
fix(stream): add check for null ptr and leader .
This commit is contained in:
commit
79672f8e5f
|
@ -323,7 +323,6 @@ typedef struct SStreamStatus {
|
||||||
int8_t schedStatus;
|
int8_t schedStatus;
|
||||||
int8_t keepTaskStatus;
|
int8_t keepTaskStatus;
|
||||||
bool appendTranstateBlock; // has append the transfer state data block already, todo: remove it
|
bool appendTranstateBlock; // has append the transfer state data block already, todo: remove it
|
||||||
int8_t pauseAllowed; // allowed task status to be set to be paused
|
|
||||||
int32_t timerActive; // timer is active
|
int32_t timerActive; // timer is active
|
||||||
int32_t inScanHistorySentinel;
|
int32_t inScanHistorySentinel;
|
||||||
} SStreamStatus;
|
} SStreamStatus;
|
||||||
|
|
|
@ -369,6 +369,7 @@ int32_t tqStreamTaskProcessScanHistoryFinishReq(SStreamMeta* pMeta, SRpcMsg* pMs
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tqStreamTaskProcessScanHistoryFinishRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) {
|
int32_t tqStreamTaskProcessScanHistoryFinishRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) {
|
||||||
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
|
char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
|
||||||
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
|
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
|
||||||
|
|
||||||
|
@ -380,6 +381,12 @@ int32_t tqStreamTaskProcessScanHistoryFinishRsp(SStreamMeta* pMeta, SRpcMsg* pMs
|
||||||
tDecodeCompleteHistoryDataMsg(&decoder, &req);
|
tDecodeCompleteHistoryDataMsg(&decoder, &req);
|
||||||
tDecoderClear(&decoder);
|
tDecoderClear(&decoder);
|
||||||
|
|
||||||
|
if (pMeta->role == NODE_ROLE_FOLLOWER) {
|
||||||
|
tqError("s-task:0x%x (vgId:%d) not handle the scan-history finish rsp, since it becomes follower",
|
||||||
|
req.upstreamTaskId, pMeta->vgId);
|
||||||
|
return TASK_DOWNSTREAM_NOT_LEADER;
|
||||||
|
}
|
||||||
|
|
||||||
SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.streamId, req.upstreamTaskId);
|
SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.streamId, req.upstreamTaskId);
|
||||||
if (pTask == NULL) {
|
if (pTask == NULL) {
|
||||||
tqError("vgId:%d process scan history finish rsp, failed to find task:0x%x, it may be destroyed", pMeta->vgId,
|
tqError("vgId:%d process scan history finish rsp, failed to find task:0x%x, it may be destroyed", pMeta->vgId,
|
||||||
|
@ -396,11 +403,11 @@ int32_t tqStreamTaskProcessScanHistoryFinishRsp(SStreamMeta* pMeta, SRpcMsg* pMs
|
||||||
"s-task:%s scan-history finish rsp received from downstream task:0x%x, all downstream tasks rsp scan-history "
|
"s-task:%s scan-history finish rsp received from downstream task:0x%x, all downstream tasks rsp scan-history "
|
||||||
"completed msg",
|
"completed msg",
|
||||||
pTask->id.idStr, req.downstreamId);
|
pTask->id.idStr, req.downstreamId);
|
||||||
streamProcessScanHistoryFinishRsp(pTask);
|
code = streamProcessScanHistoryFinishRsp(pTask);
|
||||||
}
|
}
|
||||||
|
|
||||||
streamMetaReleaseTask(pMeta, pTask);
|
streamMetaReleaseTask(pMeta, pTask);
|
||||||
return 0;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tqStreamTaskProcessCheckReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
|
int32_t tqStreamTaskProcessCheckReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
|
||||||
|
|
|
@ -476,11 +476,16 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs
|
||||||
// automatically set the related fill-history task to be failed.
|
// automatically set the related fill-history task to be failed.
|
||||||
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
|
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
|
||||||
STaskId* pId = &pTask->hTaskInfo.id;
|
STaskId* pId = &pTask->hTaskInfo.id;
|
||||||
|
int64_t current = taosGetTimestampMs();
|
||||||
|
|
||||||
SStreamTask* pHTask = streamMetaAcquireTask(pTask->pMeta, pId->streamId, pId->taskId);
|
SStreamTask* pHTask = streamMetaAcquireTask(pTask->pMeta, pId->streamId, pId->taskId);
|
||||||
streamMetaUpdateTaskDownstreamStatus(pHTask->pMeta, pId->streamId, pId->taskId, pHTask->execInfo.init,
|
if (pHTask != NULL) {
|
||||||
taosGetTimestampMs(), false);
|
streamMetaUpdateTaskDownstreamStatus(pTask->pMeta, pId->streamId, pId->taskId, pHTask->execInfo.init, current,
|
||||||
streamMetaReleaseTask(pTask->pMeta, pHTask);
|
false);
|
||||||
|
streamMetaReleaseTask(pTask->pMeta, pHTask);
|
||||||
|
} else {
|
||||||
|
streamMetaUpdateTaskDownstreamStatus(pTask->pMeta, pId->streamId, pId->taskId, 0, current, false);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} else { // TASK_DOWNSTREAM_NOT_READY, let's retry in 100ms
|
} else { // TASK_DOWNSTREAM_NOT_READY, let's retry in 100ms
|
||||||
STaskRecheckInfo* pInfo = createRecheckInfo(pTask, pRsp);
|
STaskRecheckInfo* pInfo = createRecheckInfo(pTask, pRsp);
|
||||||
|
@ -657,6 +662,11 @@ int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, SStreamScanHistory
|
||||||
int32_t streamProcessScanHistoryFinishRsp(SStreamTask* pTask) {
|
int32_t streamProcessScanHistoryFinishRsp(SStreamTask* pTask) {
|
||||||
ETaskStatus status = streamTaskGetStatus(pTask, NULL);
|
ETaskStatus status = streamTaskGetStatus(pTask, NULL);
|
||||||
|
|
||||||
|
// task restart now, not handle the scan-history finish rsp
|
||||||
|
if (status == TASK_STATUS__UNINIT) {
|
||||||
|
return TSDB_CODE_INVALID_MSG;
|
||||||
|
}
|
||||||
|
|
||||||
ASSERT(status == TASK_STATUS__SCAN_HISTORY || status == TASK_STATUS__STREAM_SCAN_HISTORY);
|
ASSERT(status == TASK_STATUS__SCAN_HISTORY || status == TASK_STATUS__STREAM_SCAN_HISTORY);
|
||||||
SStreamMeta* pMeta = pTask->pMeta;
|
SStreamMeta* pMeta = pTask->pMeta;
|
||||||
|
|
||||||
|
|
|
@ -91,6 +91,8 @@ class TDTestCase:
|
||||||
tdLog.info("loop wait result ...")
|
tdLog.info("loop wait result ...")
|
||||||
tdSql.checkDataLoop(0, 0, 99999, sql, loopCount=120, waitTime=0.5)
|
tdSql.checkDataLoop(0, 0, 99999, sql, loopCount=120, waitTime=0.5)
|
||||||
|
|
||||||
|
time.sleep(5)
|
||||||
|
|
||||||
# check all data is correct
|
# check all data is correct
|
||||||
sql = "select * from sta where cnt != 20;"
|
sql = "select * from sta where cnt != 20;"
|
||||||
tdSql.query(sql)
|
tdSql.query(sql)
|
||||||
|
|
Loading…
Reference in New Issue