fix(stream): fix error.

This commit is contained in:
Haojun Liao 2023-11-27 17:23:34 +08:00
parent 2bf4382968
commit 57b8388dad
2 changed files with 13 additions and 15 deletions

View File

@ -252,14 +252,15 @@ static void streamScanHistoryDataImpl(SStreamTask* pTask, SArray* pRes, int32_t*
SScanhistoryDataInfo streamScanHistoryData(SStreamTask* pTask, int64_t st) {
ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE);
void* exec = pTask->exec.pExecutor;
bool finished = false;
void* exec = pTask->exec.pExecutor;
bool finished = false;
const char* id = pTask->id.idStr;
qSetStreamOpOpen(exec);
while (1) {
if (streamTaskShouldPause(pTask)) {
stDebug("s-task:%s paused from the scan-history task", pTask->id.idStr);
stDebug("s-task:%s paused from the scan-history task", id);
// quit from step1, not continue to handle the step2
return (SScanhistoryDataInfo){TASK_SCANHISTORY_QUIT, 0};
}
@ -267,8 +268,7 @@ SScanhistoryDataInfo streamScanHistoryData(SStreamTask* pTask, int64_t st) {
SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock));
if (pRes == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
stError("s-task:%s scan-history prepare result block failed, code:%s, retry later", pTask->id.idStr,
tstrerror(terrno));
stError("s-task:%s scan-history prepare result block failed, code:%s, retry later", id, tstrerror(terrno));
continue;
}
@ -295,8 +295,8 @@ SScanhistoryDataInfo streamScanHistoryData(SStreamTask* pTask, int64_t st) {
}
if (el >= STREAM_SCAN_HISTORY_TIMESLICE) {
stDebug("s-task:%s fill-history:%d time slice exhausted, elapsed time:%.2fs, retry in 100ms",
pTask->id.idStr, pTask->info.fillHistory, el / 1000.0);
stDebug("s-task:%s fill-history:%d time slice exhausted, elapsed time:%.2fs, retry in 100ms", id,
pTask->info.fillHistory, el / 1000.0);
return (SScanhistoryDataInfo){TASK_SCANHISTORY_REXEC, 100};
}
}
@ -543,7 +543,7 @@ int32_t streamProcessTranstateBlock(SStreamTask* pTask, SStreamDataBlock* pBlock
* todo: the batch of blocks should be tuned dynamic, according to the total elapsed time of each batch of blocks, the
* appropriate batch of blocks should be handled in 5 to 10 sec.
*/
int32_t streamExecForAll(SStreamTask* pTask) {
int32_t doStreamExecTask(SStreamTask* pTask) {
const char* id = pTask->id.idStr;
// merge multiple input data if possible in the input queue.
@ -654,7 +654,7 @@ int32_t streamExecTask(SStreamTask* pTask) {
int8_t schedStatus = streamTaskSetSchedStatusActive(pTask);
if (schedStatus == TASK_SCHED_STATUS__WAITING) {
while (1) {
int32_t code = streamExecForAll(pTask);
int32_t code = doStreamExecTask(pTask);
if (code < 0) { // todo this status should be removed
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__FAILED);
return -1;

View File

@ -271,7 +271,6 @@ int32_t tsDecompressTimestampAvx2(const char *const input, const int32_t nelemen
if (nbytes1 == 0) {
data1 = _mm_setzero_si128();
} else {
// _mm_shuffle_epi8()
memcpy(&data1, (const void*) (input + ipos), nbytes1);
}
@ -400,15 +399,14 @@ int32_t tsDecompressTimestampAvx512(const char* const input, const int32_t nelem
__m128i prevVal = _mm_setzero_si128();
__m128i prevDelta = _mm_setzero_si128();
// _mm_maskz_loadu_epi8
#if __AVX512VL__
int32_t batch = nelements >> 1;
int32_t remainder = nelements & 0x01;
int32_t numOfBatch = nelements >> 1;
int32_t remainder = nelements & 0x01;
__mmask16 mask2[16] = {0, 0x0001, 0x0003, 0x0007, 0x000f, 0x001f, 0x003f, 0x007f, 0x00ff};
int32_t i = 0;
if (batch > 1) {
if (numOfBatch > 1) {
// first loop
uint8_t flags = input[ipos++];
@ -446,7 +444,7 @@ int32_t tsDecompressTimestampAvx512(const char* const input, const int32_t nelem
}
// the remain
for(; i < batch; ++i) {
for(; i < numOfBatch; ++i) {
uint8_t flags = input[ipos++];
int8_t nbytes1 = flags & INT8MASK(4); // range of nbytes starts from 0 to 7