Merge pull request #22214 from taosdata/fix/3_liaohj
fix(stream): scan wal in step2
This commit is contained in:
commit
ccc0f13649
|
@ -45,6 +45,7 @@ enum {
|
||||||
TASK_STATUS__FAIL,
|
TASK_STATUS__FAIL,
|
||||||
TASK_STATUS__STOP,
|
TASK_STATUS__STOP,
|
||||||
TASK_STATUS__SCAN_HISTORY, // stream task scan history data by using tsdbread in the stream scanner
|
TASK_STATUS__SCAN_HISTORY, // stream task scan history data by using tsdbread in the stream scanner
|
||||||
|
TASK_STATUS__SCAN_HISTORY_WAL, // scan history data in wal
|
||||||
TASK_STATUS__HALT, // pause, but not be manipulated by user command
|
TASK_STATUS__HALT, // pause, but not be manipulated by user command
|
||||||
TASK_STATUS__PAUSE, // pause
|
TASK_STATUS__PAUSE, // pause
|
||||||
};
|
};
|
||||||
|
@ -302,6 +303,12 @@ typedef struct {
|
||||||
SStreamQueue* queue;
|
SStreamQueue* queue;
|
||||||
} STaskOutputInfo;
|
} STaskOutputInfo;
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
int64_t init;
|
||||||
|
int64_t step1Start;
|
||||||
|
int64_t step2Start;
|
||||||
|
} STaskTimestamp;
|
||||||
|
|
||||||
struct SStreamTask {
|
struct SStreamTask {
|
||||||
SStreamId id;
|
SStreamId id;
|
||||||
SSTaskBasicInfo info;
|
SSTaskBasicInfo info;
|
||||||
|
@ -316,7 +323,7 @@ struct SStreamTask {
|
||||||
SArray* pUpstreamEpInfoList; // SArray<SStreamChildEpInfo*>, // children info
|
SArray* pUpstreamEpInfoList; // SArray<SStreamChildEpInfo*>, // children info
|
||||||
int32_t nextCheckId;
|
int32_t nextCheckId;
|
||||||
SArray* checkpointInfo; // SArray<SStreamCheckpointInfo>
|
SArray* checkpointInfo; // SArray<SStreamCheckpointInfo>
|
||||||
int64_t initTs;
|
STaskTimestamp tsInfo;
|
||||||
// output
|
// output
|
||||||
union {
|
union {
|
||||||
STaskDispatcherFixedEp fixedEpDispatcher;
|
STaskDispatcherFixedEp fixedEpDispatcher;
|
||||||
|
@ -581,6 +588,7 @@ int32_t streamTaskOutputResultBlock(SStreamTask* pTask, SStreamDataBlock* pBlock
|
||||||
bool streamTaskShouldStop(const SStreamStatus* pStatus);
|
bool streamTaskShouldStop(const SStreamStatus* pStatus);
|
||||||
bool streamTaskShouldPause(const SStreamStatus* pStatus);
|
bool streamTaskShouldPause(const SStreamStatus* pStatus);
|
||||||
bool streamTaskIsIdle(const SStreamTask* pTask);
|
bool streamTaskIsIdle(const SStreamTask* pTask);
|
||||||
|
int32_t streamTaskEndScanWAL(SStreamTask* pTask);
|
||||||
|
|
||||||
SStreamChildEpInfo * streamTaskGetUpstreamTaskEpInfo(SStreamTask* pTask, int32_t taskId);
|
SStreamChildEpInfo * streamTaskGetUpstreamTaskEpInfo(SStreamTask* pTask, int32_t taskId);
|
||||||
int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz);
|
int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz);
|
||||||
|
@ -598,7 +606,7 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs
|
||||||
int32_t streamLaunchFillHistoryTask(SStreamTask* pTask);
|
int32_t streamLaunchFillHistoryTask(SStreamTask* pTask);
|
||||||
int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask);
|
int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask);
|
||||||
int32_t streamStartRecoverTask(SStreamTask* pTask, int8_t igUntreated);
|
int32_t streamStartRecoverTask(SStreamTask* pTask, int8_t igUntreated);
|
||||||
void streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t latestVer);
|
bool streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t latestVer);
|
||||||
|
|
||||||
bool streamTaskRecoverScanStep1Finished(SStreamTask* pTask);
|
bool streamTaskRecoverScanStep1Finished(SStreamTask* pTask);
|
||||||
bool streamTaskRecoverScanStep2Finished(SStreamTask* pTask);
|
bool streamTaskRecoverScanStep2Finished(SStreamTask* pTask);
|
||||||
|
@ -639,9 +647,9 @@ void streamMetaClose(SStreamMeta* streamMeta);
|
||||||
// save to b-tree meta store
|
// save to b-tree meta store
|
||||||
int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask);
|
int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask);
|
||||||
int32_t streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId);
|
int32_t streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId);
|
||||||
int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask);
|
int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask, bool* pAdded);
|
||||||
int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int32_t taskId);
|
int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int32_t taskId);
|
||||||
int32_t streamMetaGetNumOfTasks(const SStreamMeta* pMeta); // todo remove it
|
int32_t streamMetaGetNumOfTasks(SStreamMeta* pMeta); // todo remove it
|
||||||
SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int32_t taskId);
|
SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int32_t taskId);
|
||||||
void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask);
|
void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask);
|
||||||
|
|
||||||
|
|
|
@ -72,7 +72,7 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t ver) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
pTask->initTs = taosGetTimestampMs();
|
pTask->tsInfo.init = taosGetTimestampMs();
|
||||||
pTask->inputStatus = TASK_INPUT_STATUS__NORMAL;
|
pTask->inputStatus = TASK_INPUT_STATUS__NORMAL;
|
||||||
pTask->outputInfo.status = TASK_OUTPUT_STATUS__NORMAL;
|
pTask->outputInfo.status = TASK_OUTPUT_STATUS__NORMAL;
|
||||||
pTask->pMsgCb = &pSnode->msgCb;
|
pTask->pMsgCb = &pSnode->msgCb;
|
||||||
|
@ -160,7 +160,9 @@ int32_t sndProcessTaskDeployReq(SSnode *pSnode, char *msg, int32_t msgLen) {
|
||||||
|
|
||||||
// 2.save task
|
// 2.save task
|
||||||
taosWLockLatch(&pSnode->pMeta->lock);
|
taosWLockLatch(&pSnode->pMeta->lock);
|
||||||
code = streamMetaRegisterTask(pSnode->pMeta, -1, pTask);
|
|
||||||
|
bool added = false;
|
||||||
|
code = streamMetaRegisterTask(pSnode->pMeta, -1, pTask, &added);
|
||||||
if (code < 0) {
|
if (code < 0) {
|
||||||
taosWUnLockLatch(&pSnode->pMeta->lock);
|
taosWUnLockLatch(&pSnode->pMeta->lock);
|
||||||
return -1;
|
return -1;
|
||||||
|
|
|
@ -241,7 +241,7 @@ bool tqNextBlockImpl(STqReader *pReader, const char *idstr);
|
||||||
SWalReader* tqGetWalReader(STqReader* pReader);
|
SWalReader* tqGetWalReader(STqReader* pReader);
|
||||||
SSDataBlock* tqGetResultBlock (STqReader* pReader);
|
SSDataBlock* tqGetResultBlock (STqReader* pReader);
|
||||||
|
|
||||||
int32_t extractMsgFromWal(SWalReader *pReader, void **pItem, const char *id);
|
int32_t extractMsgFromWal(SWalReader *pReader, void **pItem, int64_t maxVer, const char *id);
|
||||||
int32_t tqReaderSetSubmitMsg(STqReader *pReader, void *msgStr, int32_t msgLen, int64_t ver);
|
int32_t tqReaderSetSubmitMsg(STqReader *pReader, void *msgStr, int32_t msgLen, int64_t ver);
|
||||||
bool tqNextDataBlockFilterOut(STqReader *pReader, SHashObj *filterOutUids);
|
bool tqNextDataBlockFilterOut(STqReader *pReader, SHashObj *filterOutUids);
|
||||||
int32_t tqRetrieveDataBlock(STqReader *pReader, SSDataBlock** pRes, const char* idstr);
|
int32_t tqRetrieveDataBlock(STqReader *pReader, SSDataBlock** pRes, const char* idstr);
|
||||||
|
|
|
@ -818,7 +818,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
pTask->initTs = taosGetTimestampMs();
|
pTask->tsInfo.init = taosGetTimestampMs();
|
||||||
pTask->inputStatus = TASK_INPUT_STATUS__NORMAL;
|
pTask->inputStatus = TASK_INPUT_STATUS__NORMAL;
|
||||||
pTask->outputInfo.status = TASK_OUTPUT_STATUS__NORMAL;
|
pTask->outputInfo.status = TASK_OUTPUT_STATUS__NORMAL;
|
||||||
pTask->pMsgCb = &pTq->pVnode->msgCb;
|
pTask->pMsgCb = &pTq->pVnode->msgCb;
|
||||||
|
@ -1039,27 +1039,36 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms
|
||||||
SStreamMeta* pStreamMeta = pTq->pStreamMeta;
|
SStreamMeta* pStreamMeta = pTq->pStreamMeta;
|
||||||
|
|
||||||
// 2.save task, use the newest commit version as the initial start version of stream task.
|
// 2.save task, use the newest commit version as the initial start version of stream task.
|
||||||
int32_t taskId = 0;
|
int32_t taskId = pTask->id.taskId;
|
||||||
taosWLockLatch(&pStreamMeta->lock);
|
bool added = false;
|
||||||
code = streamMetaRegisterTask(pStreamMeta, sversion, pTask);
|
|
||||||
|
|
||||||
taskId = pTask->id.taskId;
|
taosWLockLatch(&pStreamMeta->lock);
|
||||||
|
code = streamMetaRegisterTask(pStreamMeta, sversion, pTask, &added);
|
||||||
int32_t numOfTasks = streamMetaGetNumOfTasks(pStreamMeta);
|
int32_t numOfTasks = streamMetaGetNumOfTasks(pStreamMeta);
|
||||||
|
|
||||||
if (code < 0) {
|
if (code < 0) {
|
||||||
tqError("vgId:%d failed to add s-task:%s, total:%d", vgId, pTask->id.idStr, numOfTasks);
|
tqError("vgId:%d failed to add s-task:0x%x, total:%d", vgId, pTask->id.taskId, numOfTasks);
|
||||||
tFreeStreamTask(pTask);
|
tFreeStreamTask(pTask);
|
||||||
taosWUnLockLatch(&pStreamMeta->lock);
|
taosWUnLockLatch(&pStreamMeta->lock);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// not added into meta store
|
||||||
|
if (!added) {
|
||||||
|
tqWarn("vgId:%d failed to add s-task:0x%x, already exists in meta store", vgId, taskId);
|
||||||
|
tFreeStreamTask(pTask);
|
||||||
|
pTask = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
taosWUnLockLatch(&pStreamMeta->lock);
|
taosWUnLockLatch(&pStreamMeta->lock);
|
||||||
tqDebug("vgId:%d s-task:%s is deployed and add into meta, status:%s, numOfTasks:%d", vgId, pTask->id.idStr,
|
|
||||||
streamGetTaskStatusStr(pTask->status.taskStatus), numOfTasks);
|
tqDebug("vgId:%d s-task:0x%x is deployed and add into meta, numOfTasks:%d", vgId, taskId, numOfTasks);
|
||||||
|
|
||||||
// 3. It's an fill history task, do nothing. wait for the main task to start it
|
// 3. It's an fill history task, do nothing. wait for the main task to start it
|
||||||
SStreamTask* p = streamMetaAcquireTask(pStreamMeta, taskId);
|
SStreamTask* p = streamMetaAcquireTask(pStreamMeta, taskId);
|
||||||
if (p != NULL) {
|
if (p != NULL) { // reset the downstreamReady flag.
|
||||||
streamTaskCheckDownstreamTasks(pTask);
|
p->status.downstreamReady = 0;
|
||||||
|
streamTaskCheckDownstreamTasks(p);
|
||||||
}
|
}
|
||||||
|
|
||||||
streamMetaReleaseTask(pStreamMeta, p);
|
streamMetaReleaseTask(pStreamMeta, p);
|
||||||
|
@ -1115,7 +1124,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
streamTaskDisablePause(pTask);
|
streamTaskDisablePause(pTask);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING || streamTaskShouldPause(&pTask->status)) {
|
if (streamTaskShouldStop(&pTask->status) || streamTaskShouldPause(&pTask->status)) {
|
||||||
tqDebug("s-task:%s is dropped or paused, abort recover in step1", id);
|
tqDebug("s-task:%s is dropped or paused, abort recover in step1", id);
|
||||||
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
|
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
|
||||||
streamMetaReleaseTask(pMeta, pTask);
|
streamMetaReleaseTask(pMeta, pTask);
|
||||||
|
@ -1123,11 +1132,12 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
double el = (taosGetTimestampMs() - st) / 1000.0;
|
double el = (taosGetTimestampMs() - st) / 1000.0;
|
||||||
tqDebug("s-task:%s history data scan stage(step 1) ended, elapsed time:%.2fs", id, el);
|
tqDebug("s-task:%s scan-history stage(step 1) ended, elapsed time:%.2fs", id, el);
|
||||||
|
|
||||||
if (pTask->info.fillHistory) {
|
if (pTask->info.fillHistory) {
|
||||||
SVersionRange* pRange = NULL;
|
SVersionRange* pRange = NULL;
|
||||||
SStreamTask* pStreamTask = NULL;
|
SStreamTask* pStreamTask = NULL;
|
||||||
|
bool done = false;
|
||||||
|
|
||||||
if (!pReq->igUntreated && !streamTaskRecoverScanStep1Finished(pTask)) {
|
if (!pReq->igUntreated && !streamTaskRecoverScanStep1Finished(pTask)) {
|
||||||
// 1. stop the related stream task, get the current scan wal version of stream task, ver.
|
// 1. stop the related stream task, get the current scan wal version of stream task, ver.
|
||||||
|
@ -1157,57 +1167,56 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
|
|
||||||
// now we can stop the stream task execution
|
// now we can stop the stream task execution
|
||||||
streamTaskHalt(pStreamTask);
|
streamTaskHalt(pStreamTask);
|
||||||
tqDebug("s-task:%s level:%d is halt by fill-history task:%s", pStreamTask->id.idStr, pStreamTask->info.taskLevel,
|
tqDebug("s-task:%s level:%d sched-status:%d is halt by fill-history task:%s", pStreamTask->id.idStr,
|
||||||
id);
|
pStreamTask->info.taskLevel, pStreamTask->status.schedStatus, id);
|
||||||
|
|
||||||
// if it's an source task, extract the last version in wal.
|
// if it's an source task, extract the last version in wal.
|
||||||
pRange = &pTask->dataRange.range;
|
pRange = &pTask->dataRange.range;
|
||||||
int64_t latestVer = walReaderGetCurrentVer(pStreamTask->exec.pWalReader);
|
int64_t latestVer = walReaderGetCurrentVer(pStreamTask->exec.pWalReader);
|
||||||
streamHistoryTaskSetVerRangeStep2(pTask, latestVer);
|
done = streamHistoryTaskSetVerRangeStep2(pTask, latestVer);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (done) {
|
||||||
|
pTask->tsInfo.step2Start = taosGetTimestampMs();
|
||||||
|
streamTaskEndScanWAL(pTask);
|
||||||
|
streamMetaReleaseTask(pMeta, pTask);
|
||||||
|
} else {
|
||||||
if (!streamTaskRecoverScanStep1Finished(pTask)) {
|
if (!streamTaskRecoverScanStep1Finished(pTask)) {
|
||||||
STimeWindow* pWindow = &pTask->dataRange.window;
|
STimeWindow* pWindow = &pTask->dataRange.window;
|
||||||
tqDebug("s-task:%s level:%d verRange:%" PRId64 " - %" PRId64 " window:%" PRId64 "-%" PRId64
|
tqDebug("s-task:%s level:%d verRange:%" PRId64 " - %" PRId64 " window:%" PRId64 "-%" PRId64
|
||||||
", do secondary scan-history data after halt the related stream task:%s",
|
", do secondary scan-history from WAL after halt the related stream task:%s",
|
||||||
id, pTask->info.taskLevel, pRange->minVer, pRange->maxVer, pWindow->skey, pWindow->ekey, id);
|
id, pTask->info.taskLevel, pRange->minVer, pRange->maxVer, pWindow->skey, pWindow->ekey, id);
|
||||||
ASSERT(pTask->status.schedStatus == TASK_SCHED_STATUS__WAITING);
|
ASSERT(pTask->status.schedStatus == TASK_SCHED_STATUS__WAITING);
|
||||||
|
|
||||||
st = taosGetTimestampMs();
|
pTask->tsInfo.step2Start = taosGetTimestampMs();
|
||||||
streamSetParamForStreamScannerStep2(pTask, pRange, pWindow);
|
streamSetParamForStreamScannerStep2(pTask, pRange, pWindow);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!streamTaskRecoverScanStep2Finished(pTask)) {
|
if (!streamTaskRecoverScanStep2Finished(pTask)) {
|
||||||
streamSourceScanHistoryData(pTask);
|
pTask->status.taskStatus = TASK_STATUS__SCAN_HISTORY_WAL;
|
||||||
|
if (streamTaskShouldStop(&pTask->status) || streamTaskShouldPause(&pTask->status)) {
|
||||||
if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING || streamTaskShouldPause(&pTask->status)) {
|
|
||||||
tqDebug("s-task:%s is dropped or paused, abort recover in step1", id);
|
tqDebug("s-task:%s is dropped or paused, abort recover in step1", id);
|
||||||
streamMetaReleaseTask(pMeta, pTask);
|
streamMetaReleaseTask(pMeta, pTask);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
streamTaskRecoverSetAllStepFinished(pTask);
|
int64_t dstVer = pTask->dataRange.range.minVer - 1;
|
||||||
|
|
||||||
|
pTask->chkInfo.currentVer = dstVer;
|
||||||
|
walReaderSetSkipToVersion(pTask->exec.pWalReader, dstVer);
|
||||||
|
tqDebug("s-task:%s wal reader start scan from WAL ver:%" PRId64 ", set sched-status:%d", id, dstVer,
|
||||||
|
TASK_SCHED_STATUS__INACTIVE);
|
||||||
}
|
}
|
||||||
|
|
||||||
el = (taosGetTimestampMs() - st) / 1000.0;
|
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
|
||||||
tqDebug("s-task:%s history data scan stage(step 2) ended, elapsed time:%.2fs", id, el);
|
|
||||||
|
|
||||||
// 3. notify downstream tasks to transfer executor state after handle all history blocks.
|
|
||||||
if (!pTask->status.transferState) {
|
|
||||||
code = streamDispatchTransferStateMsg(pTask);
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
|
||||||
// todo handle error
|
|
||||||
}
|
|
||||||
|
|
||||||
pTask->status.transferState = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
// 4. 1) transfer the ownership of executor state, 2) update the scan data range for source task.
|
// 4. 1) transfer the ownership of executor state, 2) update the scan data range for source task.
|
||||||
// 5. resume the related stream task.
|
// 5. resume the related stream task.
|
||||||
streamTryExec(pTask);
|
|
||||||
|
|
||||||
streamMetaReleaseTask(pMeta, pTask);
|
streamMetaReleaseTask(pMeta, pTask);
|
||||||
streamMetaReleaseTask(pMeta, pStreamTask);
|
streamMetaReleaseTask(pMeta, pStreamTask);
|
||||||
|
|
||||||
|
tqStartStreamTasks(pTq);
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
// todo update the chkInfo version for current task.
|
// todo update the chkInfo version for current task.
|
||||||
// this task has an associated history stream task, so we need to scan wal from the end version of
|
// this task has an associated history stream task, so we need to scan wal from the end version of
|
||||||
|
@ -1217,12 +1226,15 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
if (pTask->historyTaskId.taskId == 0) {
|
if (pTask->historyTaskId.taskId == 0) {
|
||||||
*pWindow = (STimeWindow){INT64_MIN, INT64_MAX};
|
*pWindow = (STimeWindow){INT64_MIN, INT64_MAX};
|
||||||
tqDebug(
|
tqDebug(
|
||||||
"s-task:%s scan history in stream time window completed, no related fill history task, reset the time "
|
"s-task:%s scanhistory in stream time window completed, no related fill-history task, reset the time "
|
||||||
"window:%" PRId64 " - %" PRId64,
|
"window:%" PRId64 " - %" PRId64,
|
||||||
id, pWindow->skey, pWindow->ekey);
|
id, pWindow->skey, pWindow->ekey);
|
||||||
|
qResetStreamInfoTimeWindow(pTask->exec.pExecutor);
|
||||||
} else {
|
} else {
|
||||||
|
// when related fill-history task exists, update the fill-history time window only when the
|
||||||
|
// state transfer is completed.
|
||||||
tqDebug(
|
tqDebug(
|
||||||
"s-task:%s scan history in stream time window completed, now start to handle data from WAL, start "
|
"s-task:%s scan-history in stream time window completed, now start to handle data from WAL, start "
|
||||||
"ver:%" PRId64 ", window:%" PRId64 " - %" PRId64,
|
"ver:%" PRId64 ", window:%" PRId64 " - %" PRId64,
|
||||||
id, pTask->chkInfo.currentVer, pWindow->skey, pWindow->ekey);
|
id, pTask->chkInfo.currentVer, pWindow->skey, pWindow->ekey);
|
||||||
}
|
}
|
||||||
|
@ -1409,8 +1421,8 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
|
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
|
||||||
if (pTask != NULL) {
|
if (pTask != NULL) {
|
||||||
// even in halt status, the data in inputQ must be processed
|
// even in halt status, the data in inputQ must be processed
|
||||||
int8_t status = pTask->status.taskStatus;
|
int8_t st = pTask->status.taskStatus;
|
||||||
if (status == TASK_STATUS__NORMAL || status == TASK_STATUS__SCAN_HISTORY) {
|
if (st == TASK_STATUS__NORMAL || st == TASK_STATUS__SCAN_HISTORY || st == TASK_STATUS__SCAN_HISTORY_WAL) {
|
||||||
tqDebug("vgId:%d s-task:%s start to process block from inputQ, last chk point:%" PRId64, vgId, pTask->id.idStr,
|
tqDebug("vgId:%d s-task:%s start to process block from inputQ, last chk point:%" PRId64, vgId, pTask->id.idStr,
|
||||||
pTask->chkInfo.version);
|
pTask->chkInfo.version);
|
||||||
streamProcessRunReq(pTask);
|
streamProcessRunReq(pTask);
|
||||||
|
|
|
@ -35,7 +35,10 @@ int32_t tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t v
|
||||||
tqProcessSubmitReqForSubscribe(pTq);
|
tqProcessSubmitReqForSubscribe(pTq);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
taosRLockLatch(&pTq->pStreamMeta->lock);
|
||||||
int32_t numOfTasks = streamMetaGetNumOfTasks(pTq->pStreamMeta);
|
int32_t numOfTasks = streamMetaGetNumOfTasks(pTq->pStreamMeta);
|
||||||
|
taosRUnLockLatch(&pTq->pStreamMeta->lock);
|
||||||
|
|
||||||
tqDebug("handle submit, restore:%d, size:%d", pTq->pVnode->restored, numOfTasks);
|
tqDebug("handle submit, restore:%d, size:%d", pTq->pVnode->restored, numOfTasks);
|
||||||
|
|
||||||
// push data for stream processing:
|
// push data for stream processing:
|
||||||
|
|
|
@ -302,13 +302,17 @@ int32_t tqReaderSeek(STqReader* pReader, int64_t ver, const char* id) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t extractMsgFromWal(SWalReader* pReader, void** pItem, const char* id) {
|
int32_t extractMsgFromWal(SWalReader* pReader, void** pItem, int64_t maxVer, const char* id) {
|
||||||
int32_t code = walNextValidMsg(pReader);
|
int32_t code = walNextValidMsg(pReader);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t ver = pReader->pHead->head.version;
|
int64_t ver = pReader->pHead->head.version;
|
||||||
|
if (ver > maxVer) {
|
||||||
|
tqDebug("maxVer in WAL:%"PRId64" reached current:%"PRId64", do not scan wal anymore, %s", maxVer, ver, id);
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
if (pReader->pHead->head.msgType == TDMT_VND_SUBMIT) {
|
if (pReader->pHead->head.msgType == TDMT_VND_SUBMIT) {
|
||||||
void* pBody = POINTER_SHIFT(pReader->pHead->head.body, sizeof(SSubmitReq2Msg));
|
void* pBody = POINTER_SHIFT(pReader->pHead->head.body, sizeof(SSubmitReq2Msg));
|
||||||
|
|
|
@ -38,9 +38,7 @@ int32_t tqStreamTasksScanWal(STQ* pTq) {
|
||||||
if (shouldIdle) {
|
if (shouldIdle) {
|
||||||
taosWLockLatch(&pMeta->lock);
|
taosWLockLatch(&pMeta->lock);
|
||||||
|
|
||||||
pMeta->walScanCounter -= 1;
|
times = (--pMeta->walScanCounter);
|
||||||
times = pMeta->walScanCounter;
|
|
||||||
|
|
||||||
ASSERT(pMeta->walScanCounter >= 0);
|
ASSERT(pMeta->walScanCounter >= 0);
|
||||||
|
|
||||||
if (pMeta->walScanCounter <= 0) {
|
if (pMeta->walScanCounter <= 0) {
|
||||||
|
@ -211,6 +209,17 @@ int32_t doSetOffsetForWalReader(SStreamTask *pTask, int32_t vgId) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void checkForFillHistoryVerRange(SStreamTask* pTask, int64_t ver) {
|
||||||
|
if ((pTask->info.fillHistory == 1) && ver > pTask->dataRange.range.maxVer) {
|
||||||
|
qWarn("s-task:%s fill-history scan WAL, currentVer:%" PRId64 "reach the maximum ver:%" PRId64
|
||||||
|
", not scan wal anymore, set the transfer state flag",
|
||||||
|
pTask->id.idStr, ver, pTask->dataRange.range.maxVer);
|
||||||
|
pTask->status.transferState = true;
|
||||||
|
|
||||||
|
/*int32_t code = */streamSchedExec(pTask);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) {
|
int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) {
|
||||||
*pScanIdle = true;
|
*pScanIdle = true;
|
||||||
bool noDataInWal = true;
|
bool noDataInWal = true;
|
||||||
|
@ -242,17 +251,26 @@ int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) {
|
||||||
int32_t status = pTask->status.taskStatus;
|
int32_t status = pTask->status.taskStatus;
|
||||||
|
|
||||||
// non-source or fill-history tasks don't need to response the WAL scan action.
|
// non-source or fill-history tasks don't need to response the WAL scan action.
|
||||||
if (pTask->info.taskLevel != TASK_LEVEL__SOURCE || pTask->info.fillHistory == 1) {
|
if (pTask->info.taskLevel != TASK_LEVEL__SOURCE) {
|
||||||
streamMetaReleaseTask(pStreamMeta, pTask);
|
streamMetaReleaseTask(pStreamMeta, pTask);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (status != TASK_STATUS__NORMAL) {
|
if (status != TASK_STATUS__NORMAL && status != TASK_STATUS__SCAN_HISTORY_WAL) {
|
||||||
tqDebug("s-task:%s not ready for new submit block from wal, status:%s", pTask->id.idStr, streamGetTaskStatusStr(status));
|
tqDebug("s-task:%s not ready for new submit block from wal, status:%s", pTask->id.idStr, streamGetTaskStatusStr(status));
|
||||||
streamMetaReleaseTask(pStreamMeta, pTask);
|
streamMetaReleaseTask(pStreamMeta, pTask);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if ((pTask->info.fillHistory == 1) && pTask->status.transferState) {
|
||||||
|
ASSERT(status == TASK_STATUS__SCAN_HISTORY_WAL);
|
||||||
|
// the maximum version of data in the WAL has reached already, the step2 is done
|
||||||
|
tqDebug("s-task:%s fill-history reach the maximum ver:%" PRId64 ", not scan wal anymore", pTask->id.idStr,
|
||||||
|
pTask->dataRange.range.maxVer);
|
||||||
|
streamMetaReleaseTask(pStreamMeta, pTask);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
if (tInputQueueIsFull(pTask)) {
|
if (tInputQueueIsFull(pTask)) {
|
||||||
tqTrace("s-task:%s input queue is full, do nothing", pTask->id.idStr);
|
tqTrace("s-task:%s input queue is full, do nothing", pTask->id.idStr);
|
||||||
streamMetaReleaseTask(pStreamMeta, pTask);
|
streamMetaReleaseTask(pStreamMeta, pTask);
|
||||||
|
@ -269,12 +287,13 @@ int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t numOfItemsInQ = taosQueueItemSize(pTask->inputQueue->queue);
|
int32_t numOfItemsInQ = taosQueueItemSize(pTask->inputQueue->queue);
|
||||||
|
int64_t maxVer = (pTask->info.fillHistory == 1)? pTask->dataRange.range.maxVer:INT64_MAX;
|
||||||
|
|
||||||
// append the data for the stream
|
|
||||||
SStreamQueueItem* pItem = NULL;
|
SStreamQueueItem* pItem = NULL;
|
||||||
code = extractMsgFromWal(pTask->exec.pWalReader, (void**) &pItem, pTask->id.idStr);
|
code = extractMsgFromWal(pTask->exec.pWalReader, (void**) &pItem, maxVer, pTask->id.idStr);
|
||||||
|
|
||||||
if ((code != TSDB_CODE_SUCCESS || pItem == NULL) && (numOfItemsInQ == 0)) { // failed, continue
|
if ((code != TSDB_CODE_SUCCESS || pItem == NULL) && (numOfItemsInQ == 0)) { // failed, continue
|
||||||
|
checkForFillHistoryVerRange(pTask, walReaderGetCurrentVer(pTask->exec.pWalReader));
|
||||||
streamMetaReleaseTask(pStreamMeta, pTask);
|
streamMetaReleaseTask(pStreamMeta, pTask);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
@ -283,9 +302,10 @@ int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) {
|
||||||
noDataInWal = false;
|
noDataInWal = false;
|
||||||
code = tAppendDataToInputQueue(pTask, pItem);
|
code = tAppendDataToInputQueue(pTask, pItem);
|
||||||
if (code == TSDB_CODE_SUCCESS) {
|
if (code == TSDB_CODE_SUCCESS) {
|
||||||
pTask->chkInfo.currentVer = walReaderGetCurrentVer(pTask->exec.pWalReader);
|
int64_t ver = walReaderGetCurrentVer(pTask->exec.pWalReader);
|
||||||
tqDebug("s-task:%s set the ver:%" PRId64 " from WALReader after extract block from WAL", pTask->id.idStr,
|
pTask->chkInfo.currentVer = ver;
|
||||||
pTask->chkInfo.currentVer);
|
checkForFillHistoryVerRange(pTask, ver);
|
||||||
|
tqDebug("s-task:%s set the ver:%" PRId64 " from WALReader after extract block from WAL", pTask->id.idStr, ver);
|
||||||
} else {
|
} else {
|
||||||
tqError("s-task:%s append input queue failed, too many in inputQ, ver:%" PRId64, pTask->id.idStr,
|
tqError("s-task:%s append input queue failed, too many in inputQ, ver:%" PRId64, pTask->id.idStr,
|
||||||
pTask->chkInfo.currentVer);
|
pTask->chkInfo.currentVer);
|
||||||
|
|
|
@ -80,7 +80,6 @@ enum {
|
||||||
STREAM_RECOVER_STEP__PREPARE1,
|
STREAM_RECOVER_STEP__PREPARE1,
|
||||||
STREAM_RECOVER_STEP__PREPARE2,
|
STREAM_RECOVER_STEP__PREPARE2,
|
||||||
STREAM_RECOVER_STEP__SCAN1,
|
STREAM_RECOVER_STEP__SCAN1,
|
||||||
STREAM_RECOVER_STEP__SCAN2,
|
|
||||||
};
|
};
|
||||||
|
|
||||||
extern int32_t exchangeObjRefPool;
|
extern int32_t exchangeObjRefPool;
|
||||||
|
|
|
@ -122,8 +122,9 @@ void qResetStreamInfoTimeWindow(qTaskInfo_t tinfo) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
qDebug("%s set fill history start key:%" PRId64, GET_TASKID(pTaskInfo), INT64_MIN);
|
qDebug("%s set stream fill-history window:%" PRId64"-%"PRId64, GET_TASKID(pTaskInfo), INT64_MIN, INT64_MAX);
|
||||||
pTaskInfo->streamInfo.fillHistoryWindow.skey = INT64_MIN;
|
pTaskInfo->streamInfo.fillHistoryWindow.skey = INT64_MIN;
|
||||||
|
pTaskInfo->streamInfo.fillHistoryWindow.ekey = INT64_MAX;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t numOfBlocks, int32_t type, const char* id) {
|
static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t numOfBlocks, int32_t type, const char* id) {
|
||||||
|
@ -892,7 +893,7 @@ int32_t qStreamSourceScanParamForHistoryScanStep1(qTaskInfo_t tinfo, SVersionRan
|
||||||
pStreamInfo->recoverStep1Finished = false;
|
pStreamInfo->recoverStep1Finished = false;
|
||||||
pStreamInfo->recoverStep2Finished = false;
|
pStreamInfo->recoverStep2Finished = false;
|
||||||
|
|
||||||
qDebug("%s step 1. set param for stream scanner for scan history data, verRange:%" PRId64 " - %" PRId64 ", window:%" PRId64
|
qDebug("%s step 1. set param for stream scanner for scan-history data, verRange:%" PRId64 " - %" PRId64 ", window:%" PRId64
|
||||||
" - %" PRId64,
|
" - %" PRId64,
|
||||||
GET_TASKID(pTaskInfo), pStreamInfo->fillHistoryVer.minVer, pStreamInfo->fillHistoryVer.maxVer, pWindow->skey,
|
GET_TASKID(pTaskInfo), pStreamInfo->fillHistoryVer.minVer, pStreamInfo->fillHistoryVer.maxVer, pWindow->skey,
|
||||||
pWindow->ekey);
|
pWindow->ekey);
|
||||||
|
@ -911,7 +912,7 @@ int32_t qStreamSourceScanParamForHistoryScanStep2(qTaskInfo_t tinfo, SVersionRan
|
||||||
pStreamInfo->recoverStep1Finished = true;
|
pStreamInfo->recoverStep1Finished = true;
|
||||||
pStreamInfo->recoverStep2Finished = false;
|
pStreamInfo->recoverStep2Finished = false;
|
||||||
|
|
||||||
qDebug("%s step 2. set param for stream scanner for scan history data, verRange:%" PRId64 " - %" PRId64
|
qDebug("%s step 2. set param for stream scanner for scan-history data, verRange:%" PRId64 " - %" PRId64
|
||||||
", window:%" PRId64 " - %" PRId64,
|
", window:%" PRId64 " - %" PRId64,
|
||||||
GET_TASKID(pTaskInfo), pStreamInfo->fillHistoryVer.minVer, pStreamInfo->fillHistoryVer.maxVer, pWindow->skey,
|
GET_TASKID(pTaskInfo), pStreamInfo->fillHistoryVer.minVer, pStreamInfo->fillHistoryVer.maxVer, pWindow->skey,
|
||||||
pWindow->ekey);
|
pWindow->ekey);
|
||||||
|
|
|
@ -1775,13 +1775,15 @@ void streamScanOperatorDecode(void* pBuff, int32_t len, SStreamScanInfo* pInfo)
|
||||||
}
|
}
|
||||||
|
|
||||||
static void doBlockDataWindowFilter(SSDataBlock* pBlock, int32_t tsIndex, STimeWindow* pWindow, const char* id) {
|
static void doBlockDataWindowFilter(SSDataBlock* pBlock, int32_t tsIndex, STimeWindow* pWindow, const char* id) {
|
||||||
if (pWindow->skey != INT64_MIN) {
|
if (pWindow->skey != INT64_MIN || pWindow->ekey != INT64_MAX) {
|
||||||
qDebug("%s filter for additional history window, skey:%"PRId64, id, pWindow->skey);
|
|
||||||
|
|
||||||
bool* p = taosMemoryCalloc(pBlock->info.rows, sizeof(bool));
|
bool* p = taosMemoryCalloc(pBlock->info.rows, sizeof(bool));
|
||||||
bool hasUnqualified = false;
|
bool hasUnqualified = false;
|
||||||
|
|
||||||
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, tsIndex);
|
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, tsIndex);
|
||||||
|
|
||||||
|
if (pWindow->skey != INT64_MIN) {
|
||||||
|
qDebug("%s filter for additional history window, skey:%" PRId64, id, pWindow->skey);
|
||||||
|
|
||||||
for (int32_t i = 0; i < pBlock->info.rows; ++i) {
|
for (int32_t i = 0; i < pBlock->info.rows; ++i) {
|
||||||
int64_t* ts = (int64_t*)colDataGetData(pCol, i);
|
int64_t* ts = (int64_t*)colDataGetData(pCol, i);
|
||||||
p[i] = (*ts >= pWindow->skey);
|
p[i] = (*ts >= pWindow->skey);
|
||||||
|
@ -1790,6 +1792,17 @@ static void doBlockDataWindowFilter(SSDataBlock* pBlock, int32_t tsIndex, STimeW
|
||||||
hasUnqualified = true;
|
hasUnqualified = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
} else if (pWindow->ekey != INT64_MAX) {
|
||||||
|
qDebug("%s filter for additional history window, ekey:%" PRId64, id, pWindow->ekey);
|
||||||
|
for (int32_t i = 0; i < pBlock->info.rows; ++i) {
|
||||||
|
int64_t* ts = (int64_t*)colDataGetData(pCol, i);
|
||||||
|
p[i] = (*ts <= pWindow->ekey);
|
||||||
|
|
||||||
|
if (!p[i]) {
|
||||||
|
hasUnqualified = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if (hasUnqualified) {
|
if (hasUnqualified) {
|
||||||
trimDataBlock(pBlock, pBlock->info.rows, p);
|
trimDataBlock(pBlock, pBlock->info.rows, p);
|
||||||
|
@ -1858,6 +1871,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
|
||||||
qDebug("stream recover step1, verRange:%" PRId64 "-%" PRId64 " window:%"PRId64"-%"PRId64", %s", pTSInfo->base.cond.startVersion,
|
qDebug("stream recover step1, verRange:%" PRId64 "-%" PRId64 " window:%"PRId64"-%"PRId64", %s", pTSInfo->base.cond.startVersion,
|
||||||
pTSInfo->base.cond.endVersion, pTSInfo->base.cond.twindows.skey, pTSInfo->base.cond.twindows.ekey, id);
|
pTSInfo->base.cond.endVersion, pTSInfo->base.cond.twindows.skey, pTSInfo->base.cond.twindows.ekey, id);
|
||||||
pStreamInfo->recoverStep = STREAM_RECOVER_STEP__SCAN1;
|
pStreamInfo->recoverStep = STREAM_RECOVER_STEP__SCAN1;
|
||||||
|
pStreamInfo->recoverScanFinished = false;
|
||||||
} else {
|
} else {
|
||||||
pTSInfo->base.cond.startVersion = pStreamInfo->fillHistoryVer.minVer;
|
pTSInfo->base.cond.startVersion = pStreamInfo->fillHistoryVer.minVer;
|
||||||
pTSInfo->base.cond.endVersion = pStreamInfo->fillHistoryVer.maxVer;
|
pTSInfo->base.cond.endVersion = pStreamInfo->fillHistoryVer.maxVer;
|
||||||
|
@ -1865,7 +1879,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
|
||||||
qDebug("stream recover step2, verRange:%" PRId64 " - %" PRId64 ", window:%" PRId64 "-%" PRId64 ", %s",
|
qDebug("stream recover step2, verRange:%" PRId64 " - %" PRId64 ", window:%" PRId64 "-%" PRId64 ", %s",
|
||||||
pTSInfo->base.cond.startVersion, pTSInfo->base.cond.endVersion, pTSInfo->base.cond.twindows.skey,
|
pTSInfo->base.cond.startVersion, pTSInfo->base.cond.endVersion, pTSInfo->base.cond.twindows.skey,
|
||||||
pTSInfo->base.cond.twindows.ekey, id);
|
pTSInfo->base.cond.twindows.ekey, id);
|
||||||
pStreamInfo->recoverStep = STREAM_RECOVER_STEP__SCAN2;
|
pStreamInfo->recoverStep = STREAM_RECOVER_STEP__NONE;
|
||||||
}
|
}
|
||||||
|
|
||||||
pAPI->tsdReader.tsdReaderClose(pTSInfo->base.dataReader);
|
pAPI->tsdReader.tsdReaderClose(pTSInfo->base.dataReader);
|
||||||
|
@ -1875,11 +1889,9 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
|
||||||
|
|
||||||
pTSInfo->scanTimes = 0;
|
pTSInfo->scanTimes = 0;
|
||||||
pTSInfo->currentGroupId = -1;
|
pTSInfo->currentGroupId = -1;
|
||||||
pStreamInfo->recoverScanFinished = false;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pStreamInfo->recoverStep == STREAM_RECOVER_STEP__SCAN1 ||
|
if (pStreamInfo->recoverStep == STREAM_RECOVER_STEP__SCAN1) {
|
||||||
pStreamInfo->recoverStep == STREAM_RECOVER_STEP__SCAN2) {
|
|
||||||
if (isTaskKilled(pTaskInfo)) {
|
if (isTaskKilled(pTaskInfo)) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
@ -1890,35 +1902,35 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
|
||||||
printDataBlock(pInfo->pRecoverRes, "scan recover");
|
printDataBlock(pInfo->pRecoverRes, "scan recover");
|
||||||
return pInfo->pRecoverRes;
|
return pInfo->pRecoverRes;
|
||||||
} break;
|
} break;
|
||||||
case STREAM_SCAN_FROM_UPDATERES: {
|
// case STREAM_SCAN_FROM_UPDATERES: {
|
||||||
generateScanRange(pInfo, pInfo->pUpdateDataRes, pInfo->pUpdateRes);
|
// generateScanRange(pInfo, pInfo->pUpdateDataRes, pInfo->pUpdateRes);
|
||||||
prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
|
// prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
|
||||||
pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
|
// pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
|
||||||
printDataBlock(pInfo->pUpdateRes, "recover update");
|
// printDataBlock(pInfo->pUpdateRes, "recover update");
|
||||||
return pInfo->pUpdateRes;
|
// return pInfo->pUpdateRes;
|
||||||
} break;
|
// } break;
|
||||||
case STREAM_SCAN_FROM_DELETE_DATA: {
|
// case STREAM_SCAN_FROM_DELETE_DATA: {
|
||||||
generateScanRange(pInfo, pInfo->pUpdateDataRes, pInfo->pUpdateRes);
|
// generateScanRange(pInfo, pInfo->pUpdateDataRes, pInfo->pUpdateRes);
|
||||||
prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
|
// prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
|
||||||
pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
|
// pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
|
||||||
copyDataBlock(pInfo->pDeleteDataRes, pInfo->pUpdateRes);
|
// copyDataBlock(pInfo->pDeleteDataRes, pInfo->pUpdateRes);
|
||||||
pInfo->pDeleteDataRes->info.type = STREAM_DELETE_DATA;
|
// pInfo->pDeleteDataRes->info.type = STREAM_DELETE_DATA;
|
||||||
printDataBlock(pInfo->pDeleteDataRes, "recover delete");
|
// printDataBlock(pInfo->pDeleteDataRes, "recover delete");
|
||||||
return pInfo->pDeleteDataRes;
|
// return pInfo->pDeleteDataRes;
|
||||||
} break;
|
// } break;
|
||||||
case STREAM_SCAN_FROM_DATAREADER_RANGE: {
|
// case STREAM_SCAN_FROM_DATAREADER_RANGE: {
|
||||||
SSDataBlock* pSDB = doRangeScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex);
|
// SSDataBlock* pSDB = doRangeScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex);
|
||||||
if (pSDB) {
|
// if (pSDB) {
|
||||||
STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
|
// STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
|
||||||
pSDB->info.type = pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RANGE ? STREAM_NORMAL : STREAM_PULL_DATA;
|
// pSDB->info.type = pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RANGE ? STREAM_NORMAL : STREAM_PULL_DATA;
|
||||||
checkUpdateData(pInfo, true, pSDB, false);
|
// checkUpdateData(pInfo, true, pSDB, false);
|
||||||
printDataBlock(pSDB, "scan recover update");
|
// printDataBlock(pSDB, "scan recover update");
|
||||||
calBlockTbName(pInfo, pSDB);
|
// calBlockTbName(pInfo, pSDB);
|
||||||
return pSDB;
|
// return pSDB;
|
||||||
}
|
// }
|
||||||
blockDataCleanup(pInfo->pUpdateDataRes);
|
// blockDataCleanup(pInfo->pUpdateDataRes);
|
||||||
pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
|
// pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
|
||||||
} break;
|
// } break;
|
||||||
default:
|
default:
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -1927,13 +1939,13 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
|
||||||
if (pInfo->pRecoverRes != NULL) {
|
if (pInfo->pRecoverRes != NULL) {
|
||||||
calBlockTbName(pInfo, pInfo->pRecoverRes);
|
calBlockTbName(pInfo, pInfo->pRecoverRes);
|
||||||
if (!pInfo->igCheckUpdate && pInfo->pUpdateInfo) {
|
if (!pInfo->igCheckUpdate && pInfo->pUpdateInfo) {
|
||||||
if (pStreamInfo->recoverStep == STREAM_RECOVER_STEP__SCAN1) {
|
// if (pStreamInfo->recoverStep == STREAM_RECOVER_STEP__SCAN1) {
|
||||||
TSKEY maxTs = pAPI->stateStore.updateInfoFillBlockData(pInfo->pUpdateInfo, pInfo->pRecoverRes, pInfo->primaryTsIndex);
|
TSKEY maxTs = pAPI->stateStore.updateInfoFillBlockData(pInfo->pUpdateInfo, pInfo->pRecoverRes, pInfo->primaryTsIndex);
|
||||||
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs);
|
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs);
|
||||||
} else {
|
// } else {
|
||||||
pInfo->pUpdateInfo->maxDataVersion = TMAX(pInfo->pUpdateInfo->maxDataVersion, pStreamInfo->fillHistoryVer.maxVer);
|
// pInfo->pUpdateInfo->maxDataVersion = TMAX(pInfo->pUpdateInfo->maxDataVersion, pStreamInfo->fillHistoryVer.maxVer);
|
||||||
doCheckUpdate(pInfo, pInfo->pRecoverRes->info.window.ekey, pInfo->pRecoverRes);
|
// doCheckUpdate(pInfo, pInfo->pRecoverRes->info.window.ekey, pInfo->pRecoverRes);
|
||||||
}
|
// }
|
||||||
}
|
}
|
||||||
if (pInfo->pCreateTbRes->info.rows > 0) {
|
if (pInfo->pCreateTbRes->info.rows > 0) {
|
||||||
pInfo->scanMode = STREAM_SCAN_FROM_RES;
|
pInfo->scanMode = STREAM_SCAN_FROM_RES;
|
||||||
|
|
|
@ -324,9 +324,6 @@ int32_t streamProcessRunReq(SStreamTask* pTask) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
/*if (pTask->dispatchType == TASK_OUTPUT__FIXED_DISPATCH || pTask->dispatchType == TASK_OUTPUT__SHUFFLE_DISPATCH) {*/
|
|
||||||
/*streamDispatchStreamBlock(pTask);*/
|
|
||||||
/*}*/
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -355,8 +355,7 @@ static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) {
|
||||||
|
|
||||||
SStreamTask* pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.taskId);
|
SStreamTask* pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.taskId);
|
||||||
if (pStreamTask == NULL) {
|
if (pStreamTask == NULL) {
|
||||||
pTask->status.transferState = false; // reset this value, to avoid transfer state again
|
// todo: destroy this task here
|
||||||
|
|
||||||
qError("s-task:%s failed to find related stream task:0x%x, it may have been destroyed or closed", pTask->id.idStr,
|
qError("s-task:%s failed to find related stream task:0x%x, it may have been destroyed or closed", pTask->id.idStr,
|
||||||
pTask->streamTaskId.taskId);
|
pTask->streamTaskId.taskId);
|
||||||
return TSDB_CODE_STREAM_TASK_NOT_EXIST;
|
return TSDB_CODE_STREAM_TASK_NOT_EXIST;
|
||||||
|
@ -510,13 +509,6 @@ int32_t streamExecForAll(SStreamTask* pTask) {
|
||||||
/*int32_t code = */extractMsgFromInputQ(pTask, &pInput, &batchSize, id);
|
/*int32_t code = */extractMsgFromInputQ(pTask, &pInput, &batchSize, id);
|
||||||
if (pInput == NULL) {
|
if (pInput == NULL) {
|
||||||
ASSERT(batchSize == 0);
|
ASSERT(batchSize == 0);
|
||||||
if (pTask->info.fillHistory && pTask->status.transferState) {
|
|
||||||
int32_t code = streamTransferStateToStreamTask(pTask);
|
|
||||||
if (code != TSDB_CODE_SUCCESS) { // todo handle this
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -584,11 +576,35 @@ bool streamTaskIsIdle(const SStreamTask* pTask) {
|
||||||
return (pTask->status.schedStatus == TASK_SCHED_STATUS__INACTIVE);
|
return (pTask->status.schedStatus == TASK_SCHED_STATUS__INACTIVE);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t streamTaskEndScanWAL(SStreamTask* pTask) {
|
||||||
|
const char* id = pTask->id.idStr;
|
||||||
|
double el = (taosGetTimestampMs() - pTask->tsInfo.step2Start) / 1000.0;
|
||||||
|
qDebug("s-task:%s scan-history from WAL stage(step 2) ended, elapsed time:%.2fs", id, el);
|
||||||
|
|
||||||
|
// 3. notify downstream tasks to transfer executor state after handle all history blocks.
|
||||||
|
pTask->status.transferState = true;
|
||||||
|
|
||||||
|
int32_t code = streamDispatchTransferStateMsg(pTask);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
// todo handle error
|
||||||
|
}
|
||||||
|
|
||||||
|
// the last execution of fill-history task, in order to transfer task operator states.
|
||||||
|
code = streamTransferStateToStreamTask(pTask);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) { // todo handle this
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t streamTryExec(SStreamTask* pTask) {
|
int32_t streamTryExec(SStreamTask* pTask) {
|
||||||
// this function may be executed by multi-threads, so status check is required.
|
// this function may be executed by multi-threads, so status check is required.
|
||||||
int8_t schedStatus =
|
int8_t schedStatus =
|
||||||
atomic_val_compare_exchange_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__WAITING, TASK_SCHED_STATUS__ACTIVE);
|
atomic_val_compare_exchange_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__WAITING, TASK_SCHED_STATUS__ACTIVE);
|
||||||
|
|
||||||
|
const char* id = pTask->id.idStr;
|
||||||
|
|
||||||
if (schedStatus == TASK_SCHED_STATUS__WAITING) {
|
if (schedStatus == TASK_SCHED_STATUS__WAITING) {
|
||||||
int32_t code = streamExecForAll(pTask);
|
int32_t code = streamExecForAll(pTask);
|
||||||
if (code < 0) { // todo this status shoudl be removed
|
if (code < 0) { // todo this status shoudl be removed
|
||||||
|
@ -597,16 +613,27 @@ int32_t streamTryExec(SStreamTask* pTask) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// todo the task should be commit here
|
// todo the task should be commit here
|
||||||
|
if (taosQueueEmpty(pTask->inputQueue->queue)) {
|
||||||
|
// fill-history WAL scan has completed
|
||||||
|
if (pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY_WAL && pTask->status.transferState == true) {
|
||||||
|
streamTaskRecoverSetAllStepFinished(pTask);
|
||||||
|
streamTaskEndScanWAL(pTask);
|
||||||
|
} else {
|
||||||
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
|
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
|
||||||
qDebug("s-task:%s exec completed, status:%s, sched-status:%d", pTask->id.idStr, streamGetTaskStatusStr(pTask->status.taskStatus),
|
qDebug("s-task:%s exec completed, status:%s, sched-status:%d", id, streamGetTaskStatusStr(pTask->status.taskStatus),
|
||||||
pTask->status.schedStatus);
|
pTask->status.schedStatus);
|
||||||
|
|
||||||
if (!taosQueueEmpty(pTask->inputQueue->queue) && (!streamTaskShouldStop(&pTask->status)) &&
|
|
||||||
(!streamTaskShouldPause(&pTask->status))) {
|
|
||||||
streamSchedExec(pTask);
|
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
qDebug("s-task:%s already started to exec by other thread, status:%s, sched-status:%d", pTask->id.idStr,
|
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
|
||||||
|
qDebug("s-task:%s exec completed, status:%s, sched-status:%d", id, streamGetTaskStatusStr(pTask->status.taskStatus),
|
||||||
|
pTask->status.schedStatus);
|
||||||
|
|
||||||
|
if ((!streamTaskShouldStop(&pTask->status)) && (!streamTaskShouldPause(&pTask->status))) {
|
||||||
|
streamSchedExec(pTask);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
qDebug("s-task:%s already started to exec by other thread, status:%s, sched-status:%d", id,
|
||||||
streamGetTaskStatusStr(pTask->status.taskStatus), pTask->status.schedStatus);
|
streamGetTaskStatusStr(pTask->status.taskStatus), pTask->status.schedStatus);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -237,7 +237,9 @@ int32_t streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// add to the ready tasks hash map, not the restored tasks hash map
|
// add to the ready tasks hash map, not the restored tasks hash map
|
||||||
int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask) {
|
int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask, bool* pAdded) {
|
||||||
|
*pAdded = false;
|
||||||
|
|
||||||
void* p = taosHashGet(pMeta->pTasks, &pTask->id.taskId, sizeof(pTask->id.taskId));
|
void* p = taosHashGet(pMeta->pTasks, &pTask->id.taskId, sizeof(pTask->id.taskId));
|
||||||
if (p == NULL) {
|
if (p == NULL) {
|
||||||
if (pMeta->expandFunc(pMeta->ahandle, pTask, ver) < 0) {
|
if (pMeta->expandFunc(pMeta->ahandle, pTask, ver) < 0) {
|
||||||
|
@ -261,13 +263,13 @@ int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTa
|
||||||
}
|
}
|
||||||
|
|
||||||
taosHashPut(pMeta->pTasks, &pTask->id.taskId, sizeof(pTask->id.taskId), &pTask, POINTER_BYTES);
|
taosHashPut(pMeta->pTasks, &pTask->id.taskId, sizeof(pTask->id.taskId), &pTask, POINTER_BYTES);
|
||||||
|
*pAdded = true;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamMetaGetNumOfTasks(const SStreamMeta* pMeta) {
|
int32_t streamMetaGetNumOfTasks(SStreamMeta* pMeta) {
|
||||||
size_t size = taosHashGetSize(pMeta->pTasks);
|
size_t size = taosHashGetSize(pMeta->pTasks);
|
||||||
ASSERT(taosArrayGetSize(pMeta->pTaskList) == taosHashGetSize(pMeta->pTasks));
|
ASSERT(taosArrayGetSize(pMeta->pTaskList) == taosHashGetSize(pMeta->pTasks));
|
||||||
|
|
||||||
return (int32_t)size;
|
return (int32_t)size;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -23,7 +23,7 @@ static void streamTaskSetRangeStreamCalc(SStreamTask* pTask);
|
||||||
static void streamTaskSetForReady(SStreamTask* pTask, int32_t numOfReqs) {
|
static void streamTaskSetForReady(SStreamTask* pTask, int32_t numOfReqs) {
|
||||||
ASSERT(pTask->status.downstreamReady == 0);
|
ASSERT(pTask->status.downstreamReady == 0);
|
||||||
pTask->status.downstreamReady = 1;
|
pTask->status.downstreamReady = 1;
|
||||||
int64_t el = (taosGetTimestampMs() - pTask->initTs);
|
int64_t el = (taosGetTimestampMs() - pTask->tsInfo.init);
|
||||||
|
|
||||||
qDebug("s-task:%s all %d downstream ready, init completed, elapsed time:%dms, task status:%s",
|
qDebug("s-task:%s all %d downstream ready, init completed, elapsed time:%dms, task status:%s",
|
||||||
pTask->id.idStr, numOfReqs, (int32_t) el, streamGetTaskStatusStr(pTask->status.taskStatus));
|
pTask->id.idStr, numOfReqs, (int32_t) el, streamGetTaskStatusStr(pTask->status.taskStatus));
|
||||||
|
@ -288,7 +288,7 @@ int32_t streamSendCheckRsp(const SStreamMeta* pMeta, const SStreamTaskCheckReq*
|
||||||
|
|
||||||
// common
|
// common
|
||||||
int32_t streamSetParamForScanHistory(SStreamTask* pTask) {
|
int32_t streamSetParamForScanHistory(SStreamTask* pTask) {
|
||||||
qDebug("s-task:%s set operator option for scan-history-data", pTask->id.idStr);
|
qDebug("s-task:%s set operator option for scan-history data", pTask->id.idStr);
|
||||||
return qSetStreamOperatorOptionForScanHistory(pTask->exec.pExecutor);
|
return qSetStreamOperatorOptionForScanHistory(pTask->exec.pExecutor);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -511,7 +511,7 @@ static void doCheckDownstreamStatus(SStreamTask* pTask, SStreamTask* pHTask) {
|
||||||
pHTask->dataRange.range.maxVer = pTask->chkInfo.currentVer;
|
pHTask->dataRange.range.maxVer = pTask->chkInfo.currentVer;
|
||||||
|
|
||||||
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
|
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
|
||||||
qDebug("s-task:%s set the launch condition for fill history s-task:%s, window:%" PRId64 " - %" PRId64
|
qDebug("s-task:%s set the launch condition for fill-history s-task:%s, window:%" PRId64 " - %" PRId64
|
||||||
" ver range:%" PRId64 " - %" PRId64,
|
" ver range:%" PRId64 " - %" PRId64,
|
||||||
pTask->id.idStr, pHTask->id.idStr, pHTask->dataRange.window.skey, pHTask->dataRange.window.ekey,
|
pTask->id.idStr, pHTask->id.idStr, pHTask->dataRange.window.skey, pHTask->dataRange.window.ekey,
|
||||||
pHTask->dataRange.range.minVer, pHTask->dataRange.range.maxVer);
|
pHTask->dataRange.range.minVer, pHTask->dataRange.range.maxVer);
|
||||||
|
@ -661,7 +661,7 @@ int32_t streamTaskRecoverSetAllStepFinished(SStreamTask* pTask) {
|
||||||
return qStreamRecoverSetAllStepFinished(exec);
|
return qStreamRecoverSetAllStepFinished(exec);
|
||||||
}
|
}
|
||||||
|
|
||||||
void streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t latestVer) {
|
bool streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t latestVer) {
|
||||||
SVersionRange* pRange = &pTask->dataRange.range;
|
SVersionRange* pRange = &pTask->dataRange.range;
|
||||||
ASSERT(latestVer >= pRange->maxVer);
|
ASSERT(latestVer >= pRange->maxVer);
|
||||||
|
|
||||||
|
@ -670,13 +670,16 @@ void streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t latestVer) {
|
||||||
// no input data yet. no need to execute the secondardy scan while stream task halt
|
// no input data yet. no need to execute the secondardy scan while stream task halt
|
||||||
streamTaskRecoverSetAllStepFinished(pTask);
|
streamTaskRecoverSetAllStepFinished(pTask);
|
||||||
qDebug(
|
qDebug(
|
||||||
"s-task:%s no need to perform secondary scan-history-data(step 2), since no data ingest during secondary scan",
|
"s-task:%s no need to perform secondary scan-history data(step 2), since no data ingest during step1 scan, "
|
||||||
pTask->id.idStr);
|
"related stream task currentVer:%" PRId64,
|
||||||
|
pTask->id.idStr, latestVer);
|
||||||
|
return true;
|
||||||
} else {
|
} else {
|
||||||
// 2. do secondary scan of the history data, the time window remain, and the version range is updated to
|
// 2. do secondary scan of the history data, the time window remain, and the version range is updated to
|
||||||
// [pTask->dataRange.range.maxVer, ver1]
|
// [pTask->dataRange.range.maxVer, ver1]
|
||||||
pRange->minVer = nextStartVer;
|
pRange->minVer = nextStartVer;
|
||||||
pRange->maxVer = latestVer - 1;
|
pRange->maxVer = latestVer - 1;
|
||||||
|
return false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -848,7 +851,8 @@ void streamTaskPause(SStreamTask* pTask) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
qDebug("s-task:%s wait for the task can be paused, vgId:%d", pTask->id.idStr, pMeta->vgId);
|
const char* pStatus = streamGetTaskStatusStr(status);
|
||||||
|
qDebug("s-task:%s wait for the task can be paused, status:%s, vgId:%d", pTask->id.idStr, pStatus, pMeta->vgId);
|
||||||
taosMsleep(100);
|
taosMsleep(100);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue