refactor: remove sleep.
This commit is contained in:
parent
5f7ce21530
commit
d4fd544c74
|
@ -548,17 +548,9 @@ int32_t streamProcessTranstateBlock(SStreamTask* pTask, SStreamDataBlock* pBlock
|
|||
return code;
|
||||
}
|
||||
|
||||
static void setTaskSchedInfo(SStreamTask* pTask, int32_t idleTime) {
|
||||
SStreamStatus* pStatus = &pTask->status;
|
||||
|
||||
pStatus->schedIdleTime = idleTime;
|
||||
pStatus->lastExecTs = taosGetTimestampMs();
|
||||
}
|
||||
|
||||
static void clearTaskSchedInfo(SStreamTask* pTask) {
|
||||
SStreamStatus* pStatus = &pTask->status;
|
||||
pStatus->schedIdleTime = 0;
|
||||
}
|
||||
static void setTaskSchedInfo(SStreamTask* pTask, int32_t idleTime) { pTask->status.schedIdleTime = idleTime; }
|
||||
static void clearTaskSchedInfo(SStreamTask* pTask) { pTask->status.schedIdleTime = 0; }
|
||||
static void setLastExecTs(SStreamTask* pTask, int64_t ts) { pTask->status.lastExecTs = ts; }
|
||||
|
||||
/**
|
||||
* todo: the batch of blocks should be tuned dynamic, according to the total elapsed time of each batch of blocks, the
|
||||
|
@ -574,21 +566,28 @@ int32_t doStreamExecTask(SStreamTask* pTask) {
|
|||
int32_t blockSize = 0;
|
||||
int32_t numOfBlocks = 0;
|
||||
SStreamQueueItem* pInput = NULL;
|
||||
|
||||
if (streamTaskShouldStop(pTask) || (streamTaskGetStatus(pTask)->state == TASK_STATUS__UNINIT)) {
|
||||
stDebug("s-task:%s stream task is stopped", id);
|
||||
break;
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (streamQueueIsFull(pTask->outputq.queue)) {
|
||||
stWarn("s-task:%s outputQ is full, idle for 500ms and retry", id);
|
||||
setTaskSchedInfo(pTask, 500);
|
||||
break;
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (pTask->inputq.status == TASK_INPUT_STATUS__BLOCKED) {
|
||||
stWarn("s-task:%s downstream task inputQ blocked, idle for 1sec and retry", pTask->id.idStr);
|
||||
stWarn("s-task:%s downstream task inputQ blocked, idle for 1sec and retry", id);
|
||||
setTaskSchedInfo(pTask, 1000);
|
||||
break;
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (taosGetTimestampMs() - pTask->status.lastExecTs < 50) {
|
||||
stDebug("s-task:%s invoke with high frequency, idle and retry exec in 50ms", id);
|
||||
setTaskSchedInfo(pTask, 50);
|
||||
return 0;
|
||||
}
|
||||
|
||||
/*int32_t code = */ streamTaskGetDataFromInputQ(pTask, &pInput, &numOfBlocks, &blockSize);
|
||||
|
@ -597,9 +596,8 @@ int32_t doStreamExecTask(SStreamTask* pTask) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t type = pInput->type;
|
||||
|
||||
// dispatch checkpoint msg to all downstream tasks
|
||||
int32_t type = pInput->type;
|
||||
if (type == STREAM_INPUT__CHECKPOINT_TRIGGER) {
|
||||
streamProcessCheckpointBlock(pTask, (SStreamDataBlock*)pInput);
|
||||
continue;
|
||||
|
@ -646,7 +644,7 @@ int32_t doStreamExecTask(SStreamTask* pTask) {
|
|||
if (ver != pInfo->processedVer) {
|
||||
stDebug("s-task:%s update processedVer(unsaved) from %" PRId64 " to %" PRId64 " nextProcessVer:%" PRId64
|
||||
" ckpt:%" PRId64,
|
||||
pTask->id.idStr, pInfo->processedVer, ver, pInfo->nextProcessVer, pInfo->checkpointVer);
|
||||
id, pInfo->processedVer, ver, pInfo->nextProcessVer, pInfo->checkpointVer);
|
||||
pInfo->processedVer = ver;
|
||||
}
|
||||
|
||||
|
@ -659,7 +657,7 @@ int32_t doStreamExecTask(SStreamTask* pTask) {
|
|||
// todo add lock
|
||||
SStreamTaskState* pState = streamTaskGetStatus(pTask);
|
||||
if (pState->state == TASK_STATUS__CK) {
|
||||
stDebug("s-task:%s checkpoint block received, set status:%s", pTask->id.idStr, pState->name);
|
||||
stDebug("s-task:%s checkpoint block received, set status:%s", id, pState->name);
|
||||
streamTaskBuildCheckpoint(pTask);
|
||||
} else {
|
||||
// todo refactor
|
||||
|
@ -672,8 +670,8 @@ int32_t doStreamExecTask(SStreamTask* pTask) {
|
|||
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
// todo: let's retry send rsp to upstream/mnode
|
||||
stError("s-task:%s failed to send checkpoint rsp to upstream, checkpointId:%d, code:%s", pTask->id.idStr,
|
||||
0, tstrerror(code));
|
||||
stError("s-task:%s failed to send checkpoint rsp to upstream, checkpointId:%d, code:%s", id, 0,
|
||||
tstrerror(code));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -774,19 +772,24 @@ int32_t streamResumeTask(SStreamTask* pTask) {
|
|||
|
||||
// check if this task needs to be idle for a while
|
||||
if (pTask->status.schedIdleTime > 0) {
|
||||
stDebug("s-task:%s idled, and will be invoked in %dms", id, pTask->status.schedIdleTime);
|
||||
schedTaskInFuture(pTask);
|
||||
|
||||
taosThreadMutexUnlock(&pTask->lock);
|
||||
setLastExecTs(pTask, taosGetTimestampMs());
|
||||
return 0;
|
||||
} else {
|
||||
int32_t numOfItems = streamQueueGetNumOfItems(pTask->inputq.queue);
|
||||
|
||||
if ((numOfItems == 0) || streamTaskShouldStop(pTask) || streamTaskShouldPause(pTask)) {
|
||||
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
|
||||
taosThreadMutexUnlock(&pTask->lock);
|
||||
|
||||
setLastExecTs(pTask, taosGetTimestampMs());
|
||||
|
||||
char* p = streamTaskGetStatus(pTask)->name;
|
||||
stDebug("s-task:%s exec completed, status:%s, sched-status:%d", id, p, pTask->status.schedStatus);
|
||||
stDebug("s-task:%s exec completed, status:%s, sched-status:%d, lastExecTs:%" PRId64, id, p,
|
||||
pTask->status.schedStatus, pTask->status.lastExecTs);
|
||||
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -17,8 +17,7 @@
|
|||
|
||||
#define MAX_STREAM_EXEC_BATCH_NUM 32
|
||||
#define MAX_SMOOTH_BURST_RATIO 5 // 5 sec
|
||||
#define WAIT_FOR_DURATION 40
|
||||
#define OUTPUT_QUEUE_FULL_WAIT_DURATION 500 // 500 ms
|
||||
#define WAIT_FOR_DURATION 10
|
||||
|
||||
// todo refactor:
|
||||
// read data from input queue
|
||||
|
@ -161,7 +160,7 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu
|
|||
// no available token in bucket for sink task, let's wait for a little bit
|
||||
if (taskLevel == TASK_LEVEL__SINK && (!streamTaskExtractAvailableToken(pTask->outputInfo.pTokenBucket, pTask->id.idStr))) {
|
||||
stDebug("s-task:%s no available token in bucket for sink data, wait for 10ms", id);
|
||||
taosMsleep(10);
|
||||
taosMsleep(WAIT_FOR_DURATION);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -173,11 +172,10 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu
|
|||
|
||||
SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputq.queue);
|
||||
if (qItem == NULL) {
|
||||
if ((taskLevel == TASK_LEVEL__SOURCE || taskLevel == TASK_LEVEL__SINK) && (++retryTimes) < MAX_RETRY_TIMES) {
|
||||
taosMsleep(WAIT_FOR_DURATION);
|
||||
// todo remove it
|
||||
continue;
|
||||
}
|
||||
// if ((taskLevel == TASK_LEVEL__SOURCE || taskLevel == TASK_LEVEL__SINK) && (++retryTimes) < MAX_RETRY_TIMES) {
|
||||
// taosMsleep(WAIT_FOR_DURATION);
|
||||
// continue;
|
||||
// }
|
||||
|
||||
// restore the token to bucket
|
||||
if (*numOfBlocks > 0) {
|
||||
|
@ -344,25 +342,6 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem)
|
|||
// the result should be put into the outputQ in any cases, the result may be lost otherwise.
|
||||
int32_t streamTaskPutDataIntoOutputQ(SStreamTask* pTask, SStreamDataBlock* pBlock) {
|
||||
STaosQueue* pQueue = pTask->outputq.queue->pQueue;
|
||||
|
||||
#if 0
|
||||
// wait for the output queue is available for new data to dispatch
|
||||
while (streamQueueIsFull(pTask->outputq.queue)) {
|
||||
if (streamTaskShouldStop(pTask)) {
|
||||
stInfo("s-task:%s discard result block due to task stop", pTask->id.idStr);
|
||||
return TSDB_CODE_STREAM_EXEC_CANCELLED;
|
||||
}
|
||||
|
||||
int32_t total = streamQueueGetNumOfItems(pTask->outputq.queue);
|
||||
double size = SIZE_IN_MiB(taosQueueMemorySize(pQueue));
|
||||
// let's wait for there are enough space to hold this result pBlock
|
||||
stDebug("s-task:%s outputQ is full, wait for %dms and retry, outputQ items:%d, size:%.2fMiB", pTask->id.idStr,
|
||||
OUTPUT_QUEUE_FULL_WAIT_DURATION, total, size);
|
||||
|
||||
taosMsleep(OUTPUT_QUEUE_FULL_WAIT_DURATION);
|
||||
}
|
||||
#endif
|
||||
|
||||
int32_t code = taosWriteQitem(pQueue, pBlock);
|
||||
|
||||
int32_t total = streamQueueGetNumOfItems(pTask->outputq.queue);
|
||||
|
|
|
@ -59,6 +59,9 @@ class TDTestCase:
|
|||
self.tdCom.create_stream(stream_name=f'{self.tb_name}{self.tdCom.stream_suffix}', des_table=self.tb_stream_des_table, source_sql=f'select _wstart AS wstart, {self.tdCom.tb_source_select_str} from {self.tb_name} partition by {partition} {partition_elm_alias} state_window({stream_state_window})', trigger_mode="at_once", subtable_value=tb_subtable_value, fill_history_value=fill_history_value)
|
||||
range_times = self.tdCom.range_count
|
||||
state_window_max = self.tdCom.dataDict['state_window_max']
|
||||
|
||||
time.sleep(2)
|
||||
|
||||
for i in range(range_times):
|
||||
state_window_value = random.randint(int((i)*state_window_max/range_times), int((i+1)*state_window_max/range_times))
|
||||
for i in range(2, range_times+3):
|
||||
|
|
Loading…
Reference in New Issue