fix(stream): set the correct step2 scan time window range.
This commit is contained in:
parent
a2694b9ce9
commit
d29f835a63
|
@ -172,7 +172,6 @@ int32_t tqStreamTasksStatusCheck(STQ* pTq);
|
||||||
|
|
||||||
// tq util
|
// tq util
|
||||||
int32_t extractDelDataBlock(const void* pData, int32_t len, int64_t ver, SStreamRefDataBlock** pRefBlock);
|
int32_t extractDelDataBlock(const void* pData, int32_t len, int64_t ver, SStreamRefDataBlock** pRefBlock);
|
||||||
int32_t tqAddInputBlockNLaunchTask(SStreamTask* pTask, SStreamQueueItem* pQueueItem);
|
|
||||||
int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg);
|
int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg);
|
||||||
int32_t tqDoSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp* pRsp, int32_t epoch, int64_t consumerId,
|
int32_t tqDoSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp* pRsp, int32_t epoch, int64_t consumerId,
|
||||||
int32_t type, int64_t sver, int64_t ever);
|
int32_t type, int64_t sver, int64_t ever);
|
||||||
|
|
|
@ -1136,7 +1136,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
pTask->streamTaskId.taskId, pTask->id.idStr);
|
pTask->streamTaskId.taskId, pTask->id.idStr);
|
||||||
|
|
||||||
pTask->status.taskStatus = TASK_STATUS__DROPPING;
|
pTask->status.taskStatus = TASK_STATUS__DROPPING;
|
||||||
tqDebug("s-task:%s scan-history-task set status to be dropping", id);
|
tqDebug("s-task:%s fill-history task set status to be dropping", id);
|
||||||
|
|
||||||
streamMetaSaveTask(pMeta, pTask);
|
streamMetaSaveTask(pMeta, pTask);
|
||||||
streamMetaReleaseTask(pMeta, pTask);
|
streamMetaReleaseTask(pMeta, pTask);
|
||||||
|
@ -1166,12 +1166,14 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!streamTaskRecoverScanStep1Finished(pTask)) {
|
if (!streamTaskRecoverScanStep1Finished(pTask)) {
|
||||||
tqDebug("s-task:%s level:%d verRange:%" PRId64 " - %" PRId64 " do secondary scan-history-data after halt the related stream task:%s",
|
STimeWindow* pWindow = &pTask->dataRange.window;
|
||||||
id, pTask->info.taskLevel, pRange->minVer, pRange->maxVer, id);
|
tqDebug("s-task:%s level:%d verRange:%" PRId64 " - %" PRId64 " window:%" PRId64 "-%" PRId64
|
||||||
|
", do secondary scan-history data after halt the related stream task:%s",
|
||||||
|
id, pTask->info.taskLevel, pRange->minVer, pRange->maxVer, pWindow->skey, pWindow->ekey, id);
|
||||||
ASSERT(pTask->status.schedStatus == TASK_SCHED_STATUS__WAITING);
|
ASSERT(pTask->status.schedStatus == TASK_SCHED_STATUS__WAITING);
|
||||||
|
|
||||||
st = taosGetTimestampMs();
|
st = taosGetTimestampMs();
|
||||||
streamSetParamForStreamScannerStep2(pTask, pRange, &pTask->dataRange.window);
|
streamSetParamForStreamScannerStep2(pTask, pRange, pWindow);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!streamTaskRecoverScanStep2Finished(pTask)) {
|
if (!streamTaskRecoverScanStep2Finished(pTask)) {
|
||||||
|
|
|
@ -336,6 +336,7 @@ int32_t extractMsgFromWal(SWalReader* pReader, void** pItem, const char* id) {
|
||||||
int32_t len = pReader->pHead->head.bodyLen - sizeof(SMsgHead);
|
int32_t len = pReader->pHead->head.bodyLen - sizeof(SMsgHead);
|
||||||
|
|
||||||
extractDelDataBlock(pBody, len, ver, (SStreamRefDataBlock**)pItem);
|
extractDelDataBlock(pBody, len, ver, (SStreamRefDataBlock**)pItem);
|
||||||
|
tqDebug("s-task:%s delete msg extract from WAL, len:%d, ver:%"PRId64, id, len, ver);
|
||||||
} else {
|
} else {
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,21 +20,6 @@
|
||||||
static int32_t tqSendMetaPollRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq,
|
static int32_t tqSendMetaPollRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq,
|
||||||
const SMqMetaRsp* pRsp, int32_t vgId);
|
const SMqMetaRsp* pRsp, int32_t vgId);
|
||||||
|
|
||||||
int32_t tqAddInputBlockNLaunchTask(SStreamTask* pTask, SStreamQueueItem* pQueueItem) {
|
|
||||||
int32_t code = tAppendDataToInputQueue(pTask, pQueueItem);
|
|
||||||
if (code < 0) {
|
|
||||||
tqError("s-task:%s failed to put into queue, too many", pTask->id.idStr);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (streamSchedExec(pTask) < 0) {
|
|
||||||
tqError("stream task:%d failed to be launched, code:%s", pTask->id.taskId, tstrerror(terrno));
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t tqInitDataRsp(SMqDataRsp* pRsp, const SMqPollReq* pReq) {
|
int32_t tqInitDataRsp(SMqDataRsp* pRsp, const SMqPollReq* pReq) {
|
||||||
pRsp->reqOffset = pReq->reqOffset;
|
pRsp->reqOffset = pReq->reqOffset;
|
||||||
|
|
||||||
|
|
|
@ -1800,8 +1800,10 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
|
||||||
} else {
|
} else {
|
||||||
pTSInfo->base.cond.startVersion = pStreamInfo->fillHistoryVer.minVer;
|
pTSInfo->base.cond.startVersion = pStreamInfo->fillHistoryVer.minVer;
|
||||||
pTSInfo->base.cond.endVersion = pStreamInfo->fillHistoryVer.maxVer;
|
pTSInfo->base.cond.endVersion = pStreamInfo->fillHistoryVer.maxVer;
|
||||||
qDebug("stream recover step2, verRange:%" PRId64 " - %" PRId64", %s", pTSInfo->base.cond.startVersion,
|
pTSInfo->base.cond.twindows = pStreamInfo->fillHistoryWindow;
|
||||||
pTSInfo->base.cond.endVersion, id);
|
qDebug("stream recover step2, verRange:%" PRId64 " - %" PRId64 ", window:%" PRId64 "-%" PRId64 ", %s",
|
||||||
|
pTSInfo->base.cond.startVersion, pTSInfo->base.cond.endVersion, pTSInfo->base.cond.twindows.skey,
|
||||||
|
pTSInfo->base.cond.twindows.ekey, id);
|
||||||
pStreamInfo->recoverStep = STREAM_RECOVER_STEP__SCAN2;
|
pStreamInfo->recoverStep = STREAM_RECOVER_STEP__SCAN2;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -65,7 +65,7 @@ static void streamSchedByTimer(void* param, void* tmrId) {
|
||||||
SStreamTask* pTask = (void*)param;
|
SStreamTask* pTask = (void*)param;
|
||||||
|
|
||||||
int8_t status = atomic_load_8(&pTask->triggerStatus);
|
int8_t status = atomic_load_8(&pTask->triggerStatus);
|
||||||
qDebug("s-task:%s in scheduler timer, trigger status:%d", pTask->id.idStr, status);
|
qDebug("s-task:%s in scheduler, trigger status:%d, next:%dms", pTask->id.idStr, status, (int32_t)pTask->triggerParam);
|
||||||
|
|
||||||
if (streamTaskShouldStop(&pTask->status) || streamTaskShouldPause(&pTask->status)) {
|
if (streamTaskShouldStop(&pTask->status) || streamTaskShouldPause(&pTask->status)) {
|
||||||
streamMetaReleaseTask(NULL, pTask);
|
streamMetaReleaseTask(NULL, pTask);
|
||||||
|
@ -74,23 +74,22 @@ static void streamSchedByTimer(void* param, void* tmrId) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (status == TASK_TRIGGER_STATUS__ACTIVE) {
|
if (status == TASK_TRIGGER_STATUS__ACTIVE) {
|
||||||
SStreamTrigger* trigger = taosAllocateQitem(sizeof(SStreamTrigger), DEF_QITEM, 0);
|
SStreamTrigger* pTrigger = taosAllocateQitem(sizeof(SStreamTrigger), DEF_QITEM, 0);
|
||||||
if (trigger == NULL) {
|
if (pTrigger == NULL) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
trigger->type = STREAM_INPUT__GET_RES;
|
pTrigger->type = STREAM_INPUT__GET_RES;
|
||||||
trigger->pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
|
pTrigger->pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
|
||||||
if (trigger->pBlock == NULL) {
|
if (pTrigger->pBlock == NULL) {
|
||||||
taosFreeQitem(trigger);
|
taosFreeQitem(pTrigger);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
trigger->pBlock->info.type = STREAM_GET_ALL;
|
|
||||||
atomic_store_8(&pTask->triggerStatus, TASK_TRIGGER_STATUS__INACTIVE);
|
atomic_store_8(&pTask->triggerStatus, TASK_TRIGGER_STATUS__INACTIVE);
|
||||||
|
pTrigger->pBlock->info.type = STREAM_GET_ALL;
|
||||||
if (tAppendDataToInputQueue(pTask, (SStreamQueueItem*)trigger) < 0) {
|
if (tAppendDataToInputQueue(pTask, (SStreamQueueItem*)pTrigger) < 0) {
|
||||||
taosFreeQitem(trigger);
|
taosFreeQitem(pTrigger);
|
||||||
taosTmrReset(streamSchedByTimer, (int32_t)pTask->triggerParam, pTask, streamEnv.timer, &pTask->schedTimer);
|
taosTmrReset(streamSchedByTimer, (int32_t)pTask->triggerParam, pTask, streamEnv.timer, &pTask->schedTimer);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -399,6 +398,7 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) {
|
||||||
|
|
||||||
if (type != STREAM_INPUT__GET_RES && type != STREAM_INPUT__CHECKPOINT && pTask->triggerParam != 0) {
|
if (type != STREAM_INPUT__GET_RES && type != STREAM_INPUT__CHECKPOINT && pTask->triggerParam != 0) {
|
||||||
atomic_val_compare_exchange_8(&pTask->triggerStatus, TASK_TRIGGER_STATUS__INACTIVE, TASK_TRIGGER_STATUS__ACTIVE);
|
atomic_val_compare_exchange_8(&pTask->triggerStatus, TASK_TRIGGER_STATUS__INACTIVE, TASK_TRIGGER_STATUS__ACTIVE);
|
||||||
|
qDebug("s-task:%s new data arrived, active the trigger, trigerStatus:%d", pTask->id.idStr, pTask->triggerStatus);
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
|
|
|
@ -166,6 +166,8 @@ int32_t streamMergeSubmit(SStreamMergedSubmit* pMerged, SStreamDataSubmit* pSubm
|
||||||
}
|
}
|
||||||
|
|
||||||
SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* pElem) {
|
SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* pElem) {
|
||||||
|
terrno = 0;
|
||||||
|
|
||||||
if (dst->type == STREAM_INPUT__DATA_BLOCK && pElem->type == STREAM_INPUT__DATA_BLOCK) {
|
if (dst->type == STREAM_INPUT__DATA_BLOCK && pElem->type == STREAM_INPUT__DATA_BLOCK) {
|
||||||
SStreamDataBlock* pBlock = (SStreamDataBlock*)dst;
|
SStreamDataBlock* pBlock = (SStreamDataBlock*)dst;
|
||||||
SStreamDataBlock* pBlockSrc = (SStreamDataBlock*)pElem;
|
SStreamDataBlock* pBlockSrc = (SStreamDataBlock*)pElem;
|
||||||
|
@ -181,7 +183,10 @@ SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem*
|
||||||
return dst;
|
return dst;
|
||||||
} else if (dst->type == STREAM_INPUT__DATA_SUBMIT && pElem->type == STREAM_INPUT__DATA_SUBMIT) {
|
} else if (dst->type == STREAM_INPUT__DATA_SUBMIT && pElem->type == STREAM_INPUT__DATA_SUBMIT) {
|
||||||
SStreamMergedSubmit* pMerged = streamMergedSubmitNew();
|
SStreamMergedSubmit* pMerged = streamMergedSubmitNew();
|
||||||
// todo handle error
|
if (pMerged == NULL) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
streamMergeSubmit(pMerged, (SStreamDataSubmit*)dst);
|
streamMergeSubmit(pMerged, (SStreamDataSubmit*)dst);
|
||||||
streamMergeSubmit(pMerged, (SStreamDataSubmit*)pElem);
|
streamMergeSubmit(pMerged, (SStreamDataSubmit*)pElem);
|
||||||
|
@ -189,6 +194,7 @@ SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem*
|
||||||
taosFreeQitem(pElem);
|
taosFreeQitem(pElem);
|
||||||
return (SStreamQueueItem*)pMerged;
|
return (SStreamQueueItem*)pMerged;
|
||||||
} else {
|
} else {
|
||||||
|
qDebug("block type:%d not merged with existed blocks list, type:%d", pElem->type, dst->type);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -407,9 +407,11 @@ static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) {
|
||||||
|
|
||||||
streamTaskResumeFromHalt(pStreamTask);
|
streamTaskResumeFromHalt(pStreamTask);
|
||||||
|
|
||||||
pTask->status.taskStatus = TASK_STATUS__DROPPING;
|
|
||||||
qDebug("s-task:%s fill-history task set status to be dropping, save the state into disk", pTask->id.idStr);
|
qDebug("s-task:%s fill-history task set status to be dropping, save the state into disk", pTask->id.idStr);
|
||||||
|
|
||||||
|
pTask->status.taskStatus = TASK_STATUS__DROPPING;
|
||||||
|
streamMetaRemoveTask(pMeta, pTask->id.taskId);
|
||||||
|
|
||||||
// save to disk
|
// save to disk
|
||||||
taosWLockLatch(&pMeta->lock);
|
taosWLockLatch(&pMeta->lock);
|
||||||
streamMetaSaveTask(pMeta, pTask);
|
streamMetaSaveTask(pMeta, pTask);
|
||||||
|
@ -464,7 +466,12 @@ static int32_t extractMsgFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu
|
||||||
// todo we need to sort the data block, instead of just appending into the array list.
|
// todo we need to sort the data block, instead of just appending into the array list.
|
||||||
void* newRet = streamMergeQueueItem(*pInput, qItem);
|
void* newRet = streamMergeQueueItem(*pInput, qItem);
|
||||||
if (newRet == NULL) {
|
if (newRet == NULL) {
|
||||||
qError("s-task:%s failed to merge blocks from inputQ, numOfBlocks:%d", id, *numOfBlocks);
|
if (terrno == 0) {
|
||||||
|
qDebug("s-task:%s failed to merge blocks from inputQ, numOfBlocks:%d", id, *numOfBlocks);
|
||||||
|
} else {
|
||||||
|
qDebug("s-task:%s failed to merge blocks from inputQ, numOfBlocks:%d, code:%s", id, *numOfBlocks,
|
||||||
|
tstrerror(terrno));
|
||||||
|
}
|
||||||
streamQueueProcessFail(pTask->inputQueue);
|
streamQueueProcessFail(pTask->inputQueue);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue