From 57b8388dad1f027bbf96200eaa08ff23fd2bec9b Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 27 Nov 2023 17:23:34 +0800 Subject: [PATCH] fix(stream): fix error. --- source/libs/stream/src/streamExec.c | 18 +++++++++--------- source/util/src/tdecompress.c | 10 ++++------ 2 files changed, 13 insertions(+), 15 deletions(-) diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 8b14846414..e0ee01d345 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -252,14 +252,15 @@ static void streamScanHistoryDataImpl(SStreamTask* pTask, SArray* pRes, int32_t* SScanhistoryDataInfo streamScanHistoryData(SStreamTask* pTask, int64_t st) { ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE); - void* exec = pTask->exec.pExecutor; - bool finished = false; + void* exec = pTask->exec.pExecutor; + bool finished = false; + const char* id = pTask->id.idStr; qSetStreamOpOpen(exec); while (1) { if (streamTaskShouldPause(pTask)) { - stDebug("s-task:%s paused from the scan-history task", pTask->id.idStr); + stDebug("s-task:%s paused from the scan-history task", id); // quit from step1, not continue to handle the step2 return (SScanhistoryDataInfo){TASK_SCANHISTORY_QUIT, 0}; } @@ -267,8 +268,7 @@ SScanhistoryDataInfo streamScanHistoryData(SStreamTask* pTask, int64_t st) { SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock)); if (pRes == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; - stError("s-task:%s scan-history prepare result block failed, code:%s, retry later", pTask->id.idStr, - tstrerror(terrno)); + stError("s-task:%s scan-history prepare result block failed, code:%s, retry later", id, tstrerror(terrno)); continue; } @@ -295,8 +295,8 @@ SScanhistoryDataInfo streamScanHistoryData(SStreamTask* pTask, int64_t st) { } if (el >= STREAM_SCAN_HISTORY_TIMESLICE) { - stDebug("s-task:%s fill-history:%d time slice exhausted, elapsed time:%.2fs, retry in 100ms", - pTask->id.idStr, pTask->info.fillHistory, el / 1000.0); + stDebug("s-task:%s fill-history:%d time slice exhausted, elapsed time:%.2fs, retry in 100ms", id, + pTask->info.fillHistory, el / 1000.0); return (SScanhistoryDataInfo){TASK_SCANHISTORY_REXEC, 100}; } } @@ -543,7 +543,7 @@ int32_t streamProcessTranstateBlock(SStreamTask* pTask, SStreamDataBlock* pBlock * todo: the batch of blocks should be tuned dynamic, according to the total elapsed time of each batch of blocks, the * appropriate batch of blocks should be handled in 5 to 10 sec. */ -int32_t streamExecForAll(SStreamTask* pTask) { +int32_t doStreamExecTask(SStreamTask* pTask) { const char* id = pTask->id.idStr; // merge multiple input data if possible in the input queue. @@ -654,7 +654,7 @@ int32_t streamExecTask(SStreamTask* pTask) { int8_t schedStatus = streamTaskSetSchedStatusActive(pTask); if (schedStatus == TASK_SCHED_STATUS__WAITING) { while (1) { - int32_t code = streamExecForAll(pTask); + int32_t code = doStreamExecTask(pTask); if (code < 0) { // todo this status should be removed atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__FAILED); return -1; diff --git a/source/util/src/tdecompress.c b/source/util/src/tdecompress.c index fb07773cd4..8edcc722eb 100644 --- a/source/util/src/tdecompress.c +++ b/source/util/src/tdecompress.c @@ -271,7 +271,6 @@ int32_t tsDecompressTimestampAvx2(const char *const input, const int32_t nelemen if (nbytes1 == 0) { data1 = _mm_setzero_si128(); } else { -// _mm_shuffle_epi8() memcpy(&data1, (const void*) (input + ipos), nbytes1); } @@ -400,15 +399,14 @@ int32_t tsDecompressTimestampAvx512(const char* const input, const int32_t nelem __m128i prevVal = _mm_setzero_si128(); __m128i prevDelta = _mm_setzero_si128(); - // _mm_maskz_loadu_epi8 #if __AVX512VL__ - int32_t batch = nelements >> 1; - int32_t remainder = nelements & 0x01; + int32_t numOfBatch = nelements >> 1; + int32_t remainder = nelements & 0x01; __mmask16 mask2[16] = {0, 0x0001, 0x0003, 0x0007, 0x000f, 0x001f, 0x003f, 0x007f, 0x00ff}; int32_t i = 0; - if (batch > 1) { + if (numOfBatch > 1) { // first loop uint8_t flags = input[ipos++]; @@ -446,7 +444,7 @@ int32_t tsDecompressTimestampAvx512(const char* const input, const int32_t nelem } // the remain - for(; i < batch; ++i) { + for(; i < numOfBatch; ++i) { uint8_t flags = input[ipos++]; int8_t nbytes1 = flags & INT8MASK(4); // range of nbytes starts from 0 to 7