fix(stream): set initial version for fill-history task. disable reentry of scan-history task function.

This commit is contained in:
Haojun Liao 2023-10-10 14:00:30 +08:00
parent 21457424a0
commit 05dfadd8f8
5 changed files with 72 additions and 37 deletions

View File

@ -272,6 +272,7 @@ typedef struct SStreamStatus {
bool appendTranstateBlock; // has append the transfer state data block already, todo: remove it bool appendTranstateBlock; // has append the transfer state data block already, todo: remove it
int8_t pauseAllowed; // allowed task status to be set to be paused int8_t pauseAllowed; // allowed task status to be set to be paused
int32_t timerActive; // timer is active int32_t timerActive; // timer is active
int32_t inScanHistorySentinel;
} SStreamStatus; } SStreamStatus;
typedef struct SDataRange { typedef struct SDataRange {
@ -358,8 +359,6 @@ typedef struct STaskOutputInfo {
STaskSinkSma smaSink; STaskSinkSma smaSink;
STaskSinkFetch fetchSink; STaskSinkFetch fetchSink;
}; };
// void* pTimer; // timer for launch sink tasks
int8_t type; int8_t type;
STokenBucket* pTokenBucket; STokenBucket* pTokenBucket;
} STaskOutputInfo; } STaskOutputInfo;
@ -375,7 +374,7 @@ struct SStreamTask {
SSTaskBasicInfo info; SSTaskBasicInfo info;
STaskOutputQueue outputq; STaskOutputQueue outputq;
STaskInputInfo inputInfo; STaskInputInfo inputInfo;
STaskSchedInfo schedInfo; // todo remove it STaskSchedInfo schedInfo;
STaskOutputInfo outputInfo; STaskOutputInfo outputInfo;
SDispatchMsgInfo msgInfo; SDispatchMsgInfo msgInfo;
SStreamStatus status; SStreamStatus status;

View File

@ -1037,14 +1037,15 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms
return code; return code;
} }
// this function should be executed by only one thread
int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
SStreamScanHistoryReq* pReq = (SStreamScanHistoryReq*)pMsg->pCont; SStreamScanHistoryReq* pReq = (SStreamScanHistoryReq*)pMsg->pCont;
SStreamMeta* pMeta = pTq->pStreamMeta; SStreamMeta* pMeta = pTq->pStreamMeta;
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId); SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId);
if (pTask == NULL) { if (pTask == NULL) {
tqError("vgId:%d failed to acquire stream task:0x%x during stream recover, task may have been destroyed", tqError("vgId:%d failed to acquire stream task:0x%x during scan history data, task may have been destroyed",
pMeta->vgId, pReq->taskId); pMeta->vgId, pReq->taskId);
return -1; return -1;
} }
@ -1052,16 +1053,38 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
// do recovery step1 // do recovery step1
const char* id = pTask->id.idStr; const char* id = pTask->id.idStr;
const char* pStatus = streamGetTaskStatusStr(pTask->status.taskStatus); const char* pStatus = streamGetTaskStatusStr(pTask->status.taskStatus);
tqDebug("s-task:%s start scan-history stage(step 1), status:%s", id, pStatus);
// avoid multi-thread exec
while(1) {
int32_t sentinel = atomic_val_compare_exchange_32(&pTask->status.inScanHistorySentinel, 0, 1);
if (sentinel != 0) {
tqDebug("s-task:%s already in scan-history func, wait for 100ms, and try again", id);
taosMsleep(100);
} else {
break;
}
}
if (pTask->execInfo.step1Start == 0) { if (pTask->execInfo.step1Start == 0) {
ASSERT(pTask->status.pauseAllowed == false); ASSERT(pTask->status.pauseAllowed == false);
pTask->execInfo.step1Start = taosGetTimestampMs(); int64_t ts = taosGetTimestampMs();
pTask->execInfo.step1Start = ts;
tqDebug("s-task:%s start scan-history stage(step 1), status:%s, step1 startTs:%" PRId64, id, pStatus, ts);
// NOTE: in case of stream task, scan-history data in wal is not allowed to pause
if (pTask->info.fillHistory == 1) { if (pTask->info.fillHistory == 1) {
streamTaskEnablePause(pTask); streamTaskEnablePause(pTask);
} }
} else { } else {
tqDebug("s-task:%s resume from paused, start ts:%" PRId64, pTask->id.idStr, pTask->execInfo.step1Start); if (pTask->execInfo.step2Start == 0) {
tqDebug("s-task:%s resume from paused, original step1 startTs:%" PRId64, id, pTask->execInfo.step1Start);
} else {
tqDebug("s-task:%s already in step2, no need to scan-history data, step2 starTs:%"PRId64, id, pTask->execInfo.step2Start);
atomic_store_32(&pTask->status.inScanHistorySentinel, 0);
streamMetaReleaseTask(pMeta, pTask);
return 0;
}
} }
// we have to continue retrying to successfully execute the scan history task. // we have to continue retrying to successfully execute the scan history task.
@ -1070,6 +1093,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
"s-task:%s failed to start scan-history in first stream time window since already started, unexpected " "s-task:%s failed to start scan-history in first stream time window since already started, unexpected "
"sched-status:%d", "sched-status:%d",
id, pTask->status.schedStatus); id, pTask->status.schedStatus);
atomic_store_32(&pTask->status.inScanHistorySentinel, 0);
streamMetaReleaseTask(pMeta, pTask); streamMetaReleaseTask(pMeta, pTask);
return 0; return 0;
} }
@ -1079,16 +1103,18 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
} }
streamScanHistoryData(pTask); streamScanHistoryData(pTask);
if (pTask->status.taskStatus == TASK_STATUS__PAUSE) {
double el = (taosGetTimestampMs() - pTask->execInfo.step1Start) / 1000.0; double el = (taosGetTimestampMs() - pTask->execInfo.step1Start) / 1000.0;
if (pTask->status.taskStatus == TASK_STATUS__PAUSE) {
int8_t status = streamTaskSetSchedStatusInActive(pTask); int8_t status = streamTaskSetSchedStatusInActive(pTask);
tqDebug("s-task:%s is paused in the step1, elapsed time:%.2fs, sched-status:%d", pTask->id.idStr, el, status); tqDebug("s-task:%s is paused in the step1, elapsed time:%.2fs, sched-status:%d", pTask->id.idStr, el, status);
atomic_store_32(&pTask->status.inScanHistorySentinel, 0);
streamMetaReleaseTask(pMeta, pTask); streamMetaReleaseTask(pMeta, pTask);
return 0; return 0;
} }
// the following procedure should be executed, no matter status is stop/pause or not // the following procedure should be executed, no matter status is stop/pause or not
double el = (taosGetTimestampMs() - pTask->execInfo.step1Start) / 1000.0;
tqDebug("s-task:%s scan-history stage(step 1) ended, elapsed time:%.2fs", id, el); tqDebug("s-task:%s scan-history stage(step 1) ended, elapsed time:%.2fs", id, el);
if (pTask->info.fillHistory) { if (pTask->info.fillHistory) {
@ -1106,6 +1132,8 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
tqDebug("s-task:%s fill-history task set status to be dropping", id); tqDebug("s-task:%s fill-history task set status to be dropping", id);
streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &pTask->id); streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &pTask->id);
atomic_store_32(&pTask->status.inScanHistorySentinel, 0);
streamMetaReleaseTask(pMeta, pTask); streamMetaReleaseTask(pMeta, pTask);
return -1; return -1;
} }
@ -1122,14 +1150,13 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
} }
// now we can stop the stream task execution // now we can stop the stream task execution
int64_t latestVer = 0; int64_t nextProcessedVer = 0;
while (1) { while (1) {
taosThreadMutexLock(&pStreamTask->lock); taosThreadMutexLock(&pStreamTask->lock);
int8_t status = pStreamTask->status.taskStatus; int8_t status = pStreamTask->status.taskStatus;
if (status == TASK_STATUS__DROPPING || status == TASK_STATUS__STOP) { if (status == TASK_STATUS__DROPPING || status == TASK_STATUS__STOP) {
// return; // return; do nothing
// do nothing
} }
if (status == TASK_STATUS__HALT) { if (status == TASK_STATUS__HALT) {
@ -1160,9 +1187,11 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
pStreamTask->status.keepTaskStatus = status; pStreamTask->status.keepTaskStatus = status;
pStreamTask->status.taskStatus = TASK_STATUS__HALT; pStreamTask->status.taskStatus = TASK_STATUS__HALT;
tqDebug("s-task:%s level:%d sched-status:%d is halt by fill-history task:%s", pStreamTask->id.idStr, nextProcessedVer = walReaderGetCurrentVer(pStreamTask->exec.pWalReader);
pStreamTask->info.taskLevel, pStreamTask->status.schedStatus, id);
latestVer = walReaderGetCurrentVer(pStreamTask->exec.pWalReader); tqDebug("s-task:%s level:%d nextProcessedVer:%" PRId64 ", sched-status:%d is halt by fill-history task:%s",
pStreamTask->id.idStr, pStreamTask->info.taskLevel, nextProcessedVer, pStreamTask->status.schedStatus,
id);
taosThreadMutexUnlock(&pStreamTask->lock); taosThreadMutexUnlock(&pStreamTask->lock);
break; break;
@ -1170,10 +1199,10 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
// if it's an source task, extract the last version in wal. // if it's an source task, extract the last version in wal.
pRange = &pTask->dataRange.range; pRange = &pTask->dataRange.range;
done = streamHistoryTaskSetVerRangeStep2(pTask, latestVer); done = streamHistoryTaskSetVerRangeStep2(pTask, nextProcessedVer);
pTask->execInfo.step2Start = taosGetTimestampMs();
if (done) { if (done) {
pTask->execInfo.step2Start = taosGetTimestampMs();
qDebug("s-task:%s scan-history from WAL stage(step 2) ended, elapsed time:%.2fs", id, 0.0); qDebug("s-task:%s scan-history from WAL stage(step 2) ended, elapsed time:%.2fs", id, 0.0);
streamTaskPutTranstateIntoInputQ(pTask); streamTaskPutTranstateIntoInputQ(pTask);
@ -1192,27 +1221,26 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
pStreamTask->id.idStr); pStreamTask->id.idStr);
ASSERT(pTask->status.schedStatus == TASK_SCHED_STATUS__WAITING); ASSERT(pTask->status.schedStatus == TASK_SCHED_STATUS__WAITING);
pTask->execInfo.step2Start = taosGetTimestampMs();
streamSetParamForStreamScannerStep2(pTask, pRange, pWindow); streamSetParamForStreamScannerStep2(pTask, pRange, pWindow);
int64_t dstVer = pTask->dataRange.range.minVer; int64_t dstVer = pTask->dataRange.range.minVer;
pTask->chkInfo.nextProcessVer = dstVer; pTask->chkInfo.nextProcessVer = dstVer;
walReaderSetSkipToVersion(pTask->exec.pWalReader, dstVer); walReaderSetSkipToVersion(pTask->exec.pWalReader, dstVer);
tqDebug("s-task:%s wal reader start scan WAL verRange:%" PRId64 "-%" PRId64 ", set sched-status:%d", id, dstVer, tqDebug("s-task:%s wal reader start scan WAL verRange:%" PRId64 "-%" PRId64 ", set sched-status:%d", id, dstVer,
pTask->dataRange.range.maxVer, TASK_SCHED_STATUS__INACTIVE); pTask->dataRange.range.maxVer, TASK_SCHED_STATUS__INACTIVE);
/*int8_t status = */streamTaskSetSchedStatusInActive(pTask); /*int8_t status = */streamTaskSetSchedStatusInActive(pTask);
// set the fill-history task to be normal // the fill-history task starts to process data in wal, let's set it status to be normal now
if (pTask->info.fillHistory == 1 && !streamTaskShouldStop(&pTask->status)) { if (pTask->info.fillHistory == 1 && !streamTaskShouldStop(&pTask->status)) {
streamSetStatusNormal(pTask); streamSetStatusNormal(pTask);
} }
tqScanWalAsync(pTq, false); tqScanWalAsync(pTq, false);
} }
streamMetaReleaseTask(pMeta, pTask);
streamMetaReleaseTask(pMeta, pStreamTask); streamMetaReleaseTask(pMeta, pStreamTask);
} else { } else {
STimeWindow* pWindow = &pTask->dataRange.window; STimeWindow* pWindow = &pTask->dataRange.window;
@ -1233,14 +1261,11 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
} }
code = streamTaskScanHistoryDataComplete(pTask); code = streamTaskScanHistoryDataComplete(pTask);
streamMetaReleaseTask(pMeta, pTask);
// when all source task complete to scan history data in stream time window, they are allowed to handle stream data
// at the same time.
return code;
} }
return 0; atomic_store_32(&pTask->status.inScanHistorySentinel, 0);
streamMetaReleaseTask(pMeta, pTask);
return code;
} }
// only the agg tasks and the sink tasks will receive this message from upstream tasks // only the agg tasks and the sink tasks will receive this message from upstream tasks

View File

@ -394,12 +394,17 @@ int32_t streamMetaRemoveTask(SStreamMeta* pMeta, STaskId* pTaskId) {
int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask, bool* pAdded) { int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask, bool* pAdded) {
*pAdded = false; *pAdded = false;
STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId}; STaskId id = streamTaskExtractKey(pTask);
void* p = taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); void* p = taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
if (p != NULL) { if (p != NULL) {
return 0; return 0;
} }
if (pTask->info.fillHistory == 1) {
stDebug("s-task:0x%x initial nextProcessVer is set to 1 for fill-history task", pTask->id.taskId);
ver = 1;
}
if (pMeta->expandFunc(pMeta->ahandle, pTask, ver) < 0) { if (pMeta->expandFunc(pMeta->ahandle, pTask, ver) < 0) {
tFreeStreamTask(pTask); tFreeStreamTask(pTask);
return -1; return -1;

View File

@ -792,24 +792,24 @@ int32_t streamTaskFillHistoryFinished(SStreamTask* pTask) {
return qStreamInfoResetTimewindowFilter(exec); return qStreamInfoResetTimewindowFilter(exec);
} }
bool streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t latestVer) { bool streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t nextProcessVer) {
SVersionRange* pRange = &pTask->dataRange.range; SVersionRange* pRange = &pTask->dataRange.range;
ASSERT(latestVer >= pRange->maxVer); ASSERT(nextProcessVer >= pRange->maxVer);
int64_t nextStartVer = pRange->maxVer + 1; int64_t walScanStartVer = pRange->maxVer + 1;
if (nextStartVer > latestVer - 1) { if (walScanStartVer > nextProcessVer - 1) {
// no input data yet. no need to execute the secondardy scan while stream task halt // no input data yet. no need to execute the secondary scan while stream task halt
streamTaskFillHistoryFinished(pTask); streamTaskFillHistoryFinished(pTask);
stDebug( stDebug(
"s-task:%s no need to perform secondary scan-history data(step 2), since no data ingest during step1 scan, " "s-task:%s no need to perform secondary scan-history data(step 2), since no data ingest during step1 scan, "
"related stream task currentVer:%" PRId64, "related stream task currentVer:%" PRId64,
pTask->id.idStr, latestVer); pTask->id.idStr, nextProcessVer);
return true; return true;
} else { } else {
// 2. do secondary scan of the history data, the time window remain, and the version range is updated to // 2. do secondary scan of the history data, the time window remain, and the version range is updated to
// [pTask->dataRange.range.maxVer, ver1] // [pTask->dataRange.range.maxVer, ver1]
pRange->minVer = nextStartVer; pRange->minVer = walScanStartVer;
pRange->maxVer = latestVer - 1; pRange->maxVer = nextProcessVer - 1;
return false; return false;
} }
} }

View File

@ -56,6 +56,9 @@ class TDTestCase:
self.tdCom.create_stream(stream_name=f'{self.stb_name}{self.tdCom.stream_suffix}', des_table=self.stb_stream_des_table, source_sql=f'select _wstart AS wstart, {self.tdCom.stb_source_select_str} from {self.stb_name} {partition_elm} interval({self.tdCom.dataDict["interval"]}s)', trigger_mode="at_once", subtable_value=stb_subtable_value, fill_history_value=fill_history_value) self.tdCom.create_stream(stream_name=f'{self.stb_name}{self.tdCom.stream_suffix}', des_table=self.stb_stream_des_table, source_sql=f'select _wstart AS wstart, {self.tdCom.stb_source_select_str} from {self.stb_name} {partition_elm} interval({self.tdCom.dataDict["interval"]}s)', trigger_mode="at_once", subtable_value=stb_subtable_value, fill_history_value=fill_history_value)
self.tdCom.create_stream(stream_name=f'{self.ctb_name}{self.tdCom.stream_suffix}', des_table=self.tdCom.ctb_stream_des_table, source_sql=f'select _wstart AS wstart, {self.tdCom.stb_source_select_str} from {self.ctb_name} {partition_elm} interval({self.tdCom.dataDict["interval"]}s)', trigger_mode="at_once", subtable_value=ctb_subtable_value, fill_history_value=fill_history_value) self.tdCom.create_stream(stream_name=f'{self.ctb_name}{self.tdCom.stream_suffix}', des_table=self.tdCom.ctb_stream_des_table, source_sql=f'select _wstart AS wstart, {self.tdCom.stb_source_select_str} from {self.ctb_name} {partition_elm} interval({self.tdCom.dataDict["interval"]}s)', trigger_mode="at_once", subtable_value=ctb_subtable_value, fill_history_value=fill_history_value)
self.tdCom.create_stream(stream_name=f'{self.tb_name}{self.tdCom.stream_suffix}', des_table=self.tdCom.tb_stream_des_table, source_sql=f'select _wstart AS wstart, {self.tdCom.tb_source_select_str} from {self.tb_name} {partition_elm} interval({self.tdCom.dataDict["interval"]}s)', trigger_mode="at_once", subtable_value=tb_subtable_value, fill_history_value=fill_history_value) self.tdCom.create_stream(stream_name=f'{self.tb_name}{self.tdCom.stream_suffix}', des_table=self.tdCom.tb_stream_des_table, source_sql=f'select _wstart AS wstart, {self.tdCom.tb_source_select_str} from {self.tb_name} {partition_elm} interval({self.tdCom.dataDict["interval"]}s)', trigger_mode="at_once", subtable_value=tb_subtable_value, fill_history_value=fill_history_value)
time.sleep(1)
for i in range(range_count): for i in range(range_count):
ts_value = str(date_time+self.tdCom.dataDict["interval"])+f'+{i*10}s' ts_value = str(date_time+self.tdCom.dataDict["interval"])+f'+{i*10}s'
ts_cast_delete_value = self.tdCom.time_cast(ts_value) ts_cast_delete_value = self.tdCom.time_cast(ts_value)
@ -75,6 +78,9 @@ class TDTestCase:
partition_elm = f'partition by {partition}' partition_elm = f'partition by {partition}'
else: else:
partition_elm = "" partition_elm = ""
time.sleep(1)
# if i == int(range_count/2): # if i == int(range_count/2):
if i > 2 and i % 3 == 0: if i > 2 and i % 3 == 0:
for stream_name in [f'{self.stb_name}{self.tdCom.stream_suffix}', f'{self.ctb_name}{self.tdCom.stream_suffix}', f'{self.tb_name}{self.tdCom.stream_suffix}']: for stream_name in [f'{self.stb_name}{self.tdCom.stream_suffix}', f'{self.ctb_name}{self.tdCom.stream_suffix}', f'{self.tb_name}{self.tdCom.stream_suffix}']: