fix(stream): fix error.

This commit is contained in:
Haojun Liao 2023-11-27 17:23:34 +08:00
parent d1193b5a7a
commit 6111138567
2 changed files with 13 additions and 15 deletions

View File

@ -252,14 +252,15 @@ static void streamScanHistoryDataImpl(SStreamTask* pTask, SArray* pRes, int32_t*
SScanhistoryDataInfo streamScanHistoryData(SStreamTask* pTask, int64_t st) { SScanhistoryDataInfo streamScanHistoryData(SStreamTask* pTask, int64_t st) {
ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE); ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE);
void* exec = pTask->exec.pExecutor; void* exec = pTask->exec.pExecutor;
bool finished = false; bool finished = false;
const char* id = pTask->id.idStr;
qSetStreamOpOpen(exec); qSetStreamOpOpen(exec);
while (1) { while (1) {
if (streamTaskShouldPause(pTask)) { if (streamTaskShouldPause(pTask)) {
stDebug("s-task:%s paused from the scan-history task", pTask->id.idStr); stDebug("s-task:%s paused from the scan-history task", id);
// quit from step1, not continue to handle the step2 // quit from step1, not continue to handle the step2
return (SScanhistoryDataInfo){TASK_SCANHISTORY_QUIT, 0}; return (SScanhistoryDataInfo){TASK_SCANHISTORY_QUIT, 0};
} }
@ -267,8 +268,7 @@ SScanhistoryDataInfo streamScanHistoryData(SStreamTask* pTask, int64_t st) {
SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock)); SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock));
if (pRes == NULL) { if (pRes == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
stError("s-task:%s scan-history prepare result block failed, code:%s, retry later", pTask->id.idStr, stError("s-task:%s scan-history prepare result block failed, code:%s, retry later", id, tstrerror(terrno));
tstrerror(terrno));
continue; continue;
} }
@ -295,8 +295,8 @@ SScanhistoryDataInfo streamScanHistoryData(SStreamTask* pTask, int64_t st) {
} }
if (el >= STREAM_SCAN_HISTORY_TIMESLICE) { if (el >= STREAM_SCAN_HISTORY_TIMESLICE) {
stDebug("s-task:%s fill-history:%d time slice exhausted, elapsed time:%.2fs, retry in 100ms", stDebug("s-task:%s fill-history:%d time slice exhausted, elapsed time:%.2fs, retry in 100ms", id,
pTask->id.idStr, pTask->info.fillHistory, el / 1000.0); pTask->info.fillHistory, el / 1000.0);
return (SScanhistoryDataInfo){TASK_SCANHISTORY_REXEC, 100}; return (SScanhistoryDataInfo){TASK_SCANHISTORY_REXEC, 100};
} }
} }
@ -543,7 +543,7 @@ int32_t streamProcessTranstateBlock(SStreamTask* pTask, SStreamDataBlock* pBlock
* todo: the batch of blocks should be tuned dynamic, according to the total elapsed time of each batch of blocks, the * todo: the batch of blocks should be tuned dynamic, according to the total elapsed time of each batch of blocks, the
* appropriate batch of blocks should be handled in 5 to 10 sec. * appropriate batch of blocks should be handled in 5 to 10 sec.
*/ */
int32_t streamExecForAll(SStreamTask* pTask) { int32_t doStreamExecTask(SStreamTask* pTask) {
const char* id = pTask->id.idStr; const char* id = pTask->id.idStr;
// merge multiple input data if possible in the input queue. // merge multiple input data if possible in the input queue.
@ -654,7 +654,7 @@ int32_t streamExecTask(SStreamTask* pTask) {
int8_t schedStatus = streamTaskSetSchedStatusActive(pTask); int8_t schedStatus = streamTaskSetSchedStatusActive(pTask);
if (schedStatus == TASK_SCHED_STATUS__WAITING) { if (schedStatus == TASK_SCHED_STATUS__WAITING) {
while (1) { while (1) {
int32_t code = streamExecForAll(pTask); int32_t code = doStreamExecTask(pTask);
if (code < 0) { // todo this status should be removed if (code < 0) { // todo this status should be removed
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__FAILED); atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__FAILED);
return -1; return -1;

View File

@ -271,7 +271,6 @@ int32_t tsDecompressTimestampAvx2(const char *const input, const int32_t nelemen
if (nbytes1 == 0) { if (nbytes1 == 0) {
data1 = _mm_setzero_si128(); data1 = _mm_setzero_si128();
} else { } else {
// _mm_shuffle_epi8()
memcpy(&data1, (const void*) (input + ipos), nbytes1); memcpy(&data1, (const void*) (input + ipos), nbytes1);
} }
@ -400,15 +399,14 @@ int32_t tsDecompressTimestampAvx512(const char* const input, const int32_t nelem
__m128i prevVal = _mm_setzero_si128(); __m128i prevVal = _mm_setzero_si128();
__m128i prevDelta = _mm_setzero_si128(); __m128i prevDelta = _mm_setzero_si128();
// _mm_maskz_loadu_epi8
#if __AVX512VL__ #if __AVX512VL__
int32_t batch = nelements >> 1; int32_t numOfBatch = nelements >> 1;
int32_t remainder = nelements & 0x01; int32_t remainder = nelements & 0x01;
__mmask16 mask2[16] = {0, 0x0001, 0x0003, 0x0007, 0x000f, 0x001f, 0x003f, 0x007f, 0x00ff}; __mmask16 mask2[16] = {0, 0x0001, 0x0003, 0x0007, 0x000f, 0x001f, 0x003f, 0x007f, 0x00ff};
int32_t i = 0; int32_t i = 0;
if (batch > 1) { if (numOfBatch > 1) {
// first loop // first loop
uint8_t flags = input[ipos++]; uint8_t flags = input[ipos++];
@ -446,7 +444,7 @@ int32_t tsDecompressTimestampAvx512(const char* const input, const int32_t nelem
} }
// the remain // the remain
for(; i < batch; ++i) { for(; i < numOfBatch; ++i) {
uint8_t flags = input[ipos++]; uint8_t flags = input[ipos++];
int8_t nbytes1 = flags & INT8MASK(4); // range of nbytes starts from 0 to 7 int8_t nbytes1 = flags & INT8MASK(4); // range of nbytes starts from 0 to 7