diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 5fe92570c9..5cb8c8aa32 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -664,6 +664,8 @@ typedef struct STaskStatusEntry { int32_t relatedHTask; // has related fill-history task int64_t activeCheckpointId; // current active checkpoint id bool checkpointFailed; // denote if the checkpoint is failed or not + bool inputQChanging; // inputQ is changing or not + int64_t inputQUnchangeCounter; double inputQUsed; // in MiB double inputRate; double sinkQuota; // existed quota size for sink task diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 36d48144bc..7c1c809441 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -2767,7 +2767,7 @@ static SStreamTask* mndGetStreamTask(STaskId* pId, SStreamObj* pStream) { static bool needDropRelatedFillhistoryTask(STaskStatusEntry *pTaskEntry, SStreamExecInfo *pExecNode) { if (pTaskEntry->status == TASK_STATUS__STREAM_SCAN_HISTORY && pTaskEntry->statusLastDuration >= 10) { - if (fabs(pTaskEntry->inputQUsed) <= DBL_EPSILON) { + if (!pTaskEntry->inputQChanging && pTaskEntry->inputQUnchangeCounter > 10) { int32_t numOfReady = 0; int32_t numOfTotal = 0; for (int32_t k = 0; k < taosArrayGetSize(pExecNode->pTaskList); ++k) { @@ -2909,6 +2909,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { for (int32_t i = 0; i < req.numOfTasks; ++i) { STaskStatusEntry *p = taosArrayGet(req.pTaskStatus, i); + STaskStatusEntry *pTaskEntry = taosHashGet(execInfo.pTaskMap, &p->id, sizeof(p->id)); if (pTaskEntry == NULL) { mError("s-task:0x%" PRIx64 " not found in mnode task list", p->id.taskId); @@ -2917,7 +2918,22 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { if (pTaskEntry->stage != p->stage && pTaskEntry->stage != -1) { updateStageInfo(pTaskEntry, p->stage); + if(pTaskEntry->nodeId == SNODE_HANDLE) { + snodeChanged = true; + } } else { + // task is idle for more than 50 sec. + if (fabs(pTaskEntry->inputQUsed - p->inputQUsed) <= DBL_EPSILON) { + if (!pTaskEntry->inputQChanging) { + pTaskEntry->inputQUnchangeCounter++; + } else { + pTaskEntry->inputQChanging = false; + } + } else { + pTaskEntry->inputQChanging = true; + pTaskEntry->inputQUnchangeCounter = 0; + } + streamTaskStatusCopy(pTaskEntry, p); if (p->activeCheckpointId != 0) { if (activeCheckpointId != 0) { diff --git a/source/util/src/tdecompress.c b/source/util/src/tdecompress.c index f32a4014d6..e31d8f9079 100644 --- a/source/util/src/tdecompress.c +++ b/source/util/src/tdecompress.c @@ -53,11 +53,8 @@ int32_t tsDecompressIntImpl_Hw(const char *const input, const int32_t nelements, int64_t prev_value = 0; #if __AVX2__ - while (1) { - if (_pos == nelements) break; - - uint64_t w = 0; - memcpy(&w, ip, LONG_BYTES); + while (_pos < nelements) { + uint64_t w = *(uint64_t*) ip; char selector = (char)(w & INT64MASK(4)); // selector = 4 char bit = bit_per_integer[(int32_t)selector]; // bit = 3 @@ -246,6 +243,81 @@ int32_t tsDecompressFloatImplAvx512(const char *const input, const int32_t nelem // todo add later int32_t tsDecompressFloatImplAvx2(const char *const input, const int32_t nelements, char *const output) { #if __AVX2__ +#endif + return 0; +} + +int32_t tsDecompressTimestampAvx2(const char* const input, const int32_t nelements, char *const output, bool bigEndian) { + int64_t *ostream = (int64_t *)output; + int32_t ipos = 1, opos = 0; + int8_t nbytes = 0; + + int64_t prevValue = 0; + int64_t prevDelta = 0; + + int64_t deltaOfDelta = 0; + int32_t longBytes = LONG_BYTES; + + // _mm_maskz_loadu_epi8 +#if __AVX2__ + + // _mm_blendv_epi8 + int32_t batch = nelements >> 4; + int32_t remainder = nelements & 0x03; + + for(int32_t i = 0; i < batch; ++i) { + uint8_t flags = input[ipos++]; + + // Decode dd1 + uint64_t dd1 = 0; + nbytes = flags & INT8MASK(4); // range of nbytes starts from 0 to 7 + // __m128i mask = {};//[0], [] + + if (nbytes == 0) { + deltaOfDelta = 0; + } else { + if (bigEndian) { + memcpy(((char *)(&dd1)) + longBytes - nbytes, input + ipos, nbytes); + } else { + memcpy(&dd1, input + ipos, nbytes); + } + deltaOfDelta = ZIGZAG_DECODE(int64_t, dd1); + } + + ipos += nbytes; + prevDelta += deltaOfDelta; + prevValue += prevDelta; + ostream[opos++] = prevValue; + + // Decode dd2 + uint64_t dd2 = 0; + nbytes = (flags >> 4) & INT8MASK(4); + if (nbytes == 0) { + deltaOfDelta = 0; + } else { + if (bigEndian) { + memcpy(((char *)(&dd2)) + longBytes - nbytes, input + ipos, nbytes); + } else { + memcpy(&dd2, input + ipos, nbytes); + } + // zigzag_decoding + deltaOfDelta = ZIGZAG_DECODE(int64_t, dd2); + } + + ipos += nbytes; + prevDelta += deltaOfDelta; + prevValue += prevDelta; + ostream[opos++] = prevValue; + + if (opos == nelements) { + return nelements * longBytes; + } + } + + if (remainder > 0) { + + } + #endif return 0; } \ No newline at end of file