diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index cf9fc1d826..b8ceb31b80 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -664,6 +664,8 @@ typedef struct STaskStatusEntry { int32_t relatedHTask; // has related fill-history task int64_t activeCheckpointId; // current active checkpoint id bool checkpointFailed; // denote if the checkpoint is failed or not + bool inputQChanging; // inputQ is changing or not + int64_t inputQUnchangeCounter; double inputQUsed; // in MiB double inputRate; double sinkQuota; // existed quota size for sink task diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index db013017e3..5490d53587 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -2790,7 +2790,7 @@ static SStreamTask* mndGetStreamTask(STaskId* pId, SStreamObj* pStream) { static bool needDropRelatedFillhistoryTask(STaskStatusEntry *pTaskEntry, SStreamExecInfo *pExecNode) { if (pTaskEntry->status == TASK_STATUS__STREAM_SCAN_HISTORY && pTaskEntry->statusLastDuration >= 10) { - if (fabs(pTaskEntry->inputQUsed) <= DBL_EPSILON) { + if (!pTaskEntry->inputQChanging && pTaskEntry->inputQUnchangeCounter > 10) { int32_t numOfReady = 0; int32_t numOfTotal = 0; for (int32_t k = 0; k < taosArrayGetSize(pExecNode->pTaskList); ++k) { @@ -2933,6 +2933,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { bool snodeChanged = false; for (int32_t i = 0; i < req.numOfTasks; ++i) { STaskStatusEntry *p = taosArrayGet(req.pTaskStatus, i); + STaskStatusEntry *pTaskEntry = taosHashGet(execInfo.pTaskMap, &p->id, sizeof(p->id)); if (pTaskEntry == NULL) { mError("s-task:0x%" PRIx64 " not found in mnode task list", p->id.taskId); @@ -2941,8 +2942,22 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { if (pTaskEntry->stage != p->stage && pTaskEntry->stage != -1) { updateStageInfo(pTaskEntry, p->stage); - if(pTaskEntry->nodeId == SNODE_HANDLE) snodeChanged = true; + if(pTaskEntry->nodeId == SNODE_HANDLE) { + snodeChanged = true; + } } else { + // task is idle for more than 50 sec. + if (fabs(pTaskEntry->inputQUsed - p->inputQUsed) <= DBL_EPSILON) { + if (!pTaskEntry->inputQChanging) { + pTaskEntry->inputQUnchangeCounter++; + } else { + pTaskEntry->inputQChanging = false; + } + } else { + pTaskEntry->inputQChanging = true; + pTaskEntry->inputQUnchangeCounter = 0; + } + streamTaskStatusCopy(pTaskEntry, p); if (p->activeCheckpointId != 0) { if (activeCheckpointId != 0) { diff --git a/source/util/src/tdecompress.c b/source/util/src/tdecompress.c index 68841941db..19f4e1fd92 100644 --- a/source/util/src/tdecompress.c +++ b/source/util/src/tdecompress.c @@ -53,11 +53,8 @@ int32_t tsDecompressIntImpl_Hw(const char *const input, const int32_t nelements, int64_t prevValue = 0; #if __AVX2__ - while (1) { - if (_pos == nelements) break; - - uint64_t w = 0; - memcpy(&w, ip, LONG_BYTES); + while (_pos < nelements) { + uint64_t w = *(uint64_t*) ip; char selector = (char)(w & INT64MASK(4)); // selector = 4 char bit = bit_per_integer[(int32_t)selector]; // bit = 3 @@ -261,17 +258,20 @@ int32_t tsDecompressTimestampAvx2(const char* const input, const int32_t nelemen int64_t deltaOfDelta = 0; int32_t longBytes = LONG_BYTES; + // _mm_maskz_loadu_epi8 #if __AVX2__ - int32_t batch = nelements >> 2; - int32_t remainder = nelements & 0x1; + // _mm_blendv_epi8 + int32_t batch = nelements >> 4; + int32_t remainder = nelements & 0x03; - while (1) { + for(int32_t i = 0; i < batch; ++i) { uint8_t flags = input[ipos++]; // Decode dd1 uint64_t dd1 = 0; - nbytes = flags & INT8MASK(4); + nbytes = flags & INT8MASK(4); // range of nbytes starts from 0 to 7 + // __m128i mask = {};//[0], [] if (nbytes == 0) { deltaOfDelta = 0; @@ -289,10 +289,6 @@ int32_t tsDecompressTimestampAvx2(const char* const input, const int32_t nelemen prevValue += prevDelta; ostream[opos++] = prevValue; - if (opos == nelements) { - return nelements * longBytes; - } - // Decode dd2 uint64_t dd2 = 0; nbytes = (flags >> 4) & INT8MASK(4); @@ -317,6 +313,11 @@ int32_t tsDecompressTimestampAvx2(const char* const input, const int32_t nelemen return nelements * longBytes; } } + + if (remainder > 0) { + + } + #endif return 0; } \ No newline at end of file