fix(stream): fix status check for stream-scan-history status when drop stream task msg is missing.

This commit is contained in:
Haojun Liao 2023-11-24 14:44:00 +08:00
parent 9d735e7f8b
commit 8515a2f9b6
3 changed files with 96 additions and 6 deletions

View File

@ -664,6 +664,8 @@ typedef struct STaskStatusEntry {
int32_t relatedHTask; // has related fill-history task
int64_t activeCheckpointId; // current active checkpoint id
bool checkpointFailed; // denote if the checkpoint is failed or not
bool inputQChanging; // inputQ is changing or not
int64_t inputQUnchangeCounter;
double inputQUsed; // in MiB
double inputRate;
double sinkQuota; // existed quota size for sink task

View File

@ -2767,7 +2767,7 @@ static SStreamTask* mndGetStreamTask(STaskId* pId, SStreamObj* pStream) {
static bool needDropRelatedFillhistoryTask(STaskStatusEntry *pTaskEntry, SStreamExecInfo *pExecNode) {
if (pTaskEntry->status == TASK_STATUS__STREAM_SCAN_HISTORY && pTaskEntry->statusLastDuration >= 10) {
if (fabs(pTaskEntry->inputQUsed) <= DBL_EPSILON) {
if (!pTaskEntry->inputQChanging && pTaskEntry->inputQUnchangeCounter > 10) {
int32_t numOfReady = 0;
int32_t numOfTotal = 0;
for (int32_t k = 0; k < taosArrayGetSize(pExecNode->pTaskList); ++k) {
@ -2909,6 +2909,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
for (int32_t i = 0; i < req.numOfTasks; ++i) {
STaskStatusEntry *p = taosArrayGet(req.pTaskStatus, i);
STaskStatusEntry *pTaskEntry = taosHashGet(execInfo.pTaskMap, &p->id, sizeof(p->id));
if (pTaskEntry == NULL) {
mError("s-task:0x%" PRIx64 " not found in mnode task list", p->id.taskId);
@ -2917,7 +2918,22 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
if (pTaskEntry->stage != p->stage && pTaskEntry->stage != -1) {
updateStageInfo(pTaskEntry, p->stage);
if(pTaskEntry->nodeId == SNODE_HANDLE) {
snodeChanged = true;
}
} else {
// task is idle for more than 50 sec.
if (fabs(pTaskEntry->inputQUsed - p->inputQUsed) <= DBL_EPSILON) {
if (!pTaskEntry->inputQChanging) {
pTaskEntry->inputQUnchangeCounter++;
} else {
pTaskEntry->inputQChanging = false;
}
} else {
pTaskEntry->inputQChanging = true;
pTaskEntry->inputQUnchangeCounter = 0;
}
streamTaskStatusCopy(pTaskEntry, p);
if (p->activeCheckpointId != 0) {
if (activeCheckpointId != 0) {

View File

@ -53,11 +53,8 @@ int32_t tsDecompressIntImpl_Hw(const char *const input, const int32_t nelements,
int64_t prev_value = 0;
#if __AVX2__
while (1) {
if (_pos == nelements) break;
uint64_t w = 0;
memcpy(&w, ip, LONG_BYTES);
while (_pos < nelements) {
uint64_t w = *(uint64_t*) ip;
char selector = (char)(w & INT64MASK(4)); // selector = 4
char bit = bit_per_integer[(int32_t)selector]; // bit = 3
@ -249,3 +246,78 @@ int32_t tsDecompressFloatImplAvx2(const char *const input, const int32_t nelemen
#endif
return 0;
}
int32_t tsDecompressTimestampAvx2(const char* const input, const int32_t nelements, char *const output, bool bigEndian) {
int64_t *ostream = (int64_t *)output;
int32_t ipos = 1, opos = 0;
int8_t nbytes = 0;
int64_t prevValue = 0;
int64_t prevDelta = 0;
int64_t deltaOfDelta = 0;
int32_t longBytes = LONG_BYTES;
// _mm_maskz_loadu_epi8
#if __AVX2__
// _mm_blendv_epi8
int32_t batch = nelements >> 4;
int32_t remainder = nelements & 0x03;
for(int32_t i = 0; i < batch; ++i) {
uint8_t flags = input[ipos++];
// Decode dd1
uint64_t dd1 = 0;
nbytes = flags & INT8MASK(4); // range of nbytes starts from 0 to 7
// __m128i mask = {};//[0], []
if (nbytes == 0) {
deltaOfDelta = 0;
} else {
if (bigEndian) {
memcpy(((char *)(&dd1)) + longBytes - nbytes, input + ipos, nbytes);
} else {
memcpy(&dd1, input + ipos, nbytes);
}
deltaOfDelta = ZIGZAG_DECODE(int64_t, dd1);
}
ipos += nbytes;
prevDelta += deltaOfDelta;
prevValue += prevDelta;
ostream[opos++] = prevValue;
// Decode dd2
uint64_t dd2 = 0;
nbytes = (flags >> 4) & INT8MASK(4);
if (nbytes == 0) {
deltaOfDelta = 0;
} else {
if (bigEndian) {
memcpy(((char *)(&dd2)) + longBytes - nbytes, input + ipos, nbytes);
} else {
memcpy(&dd2, input + ipos, nbytes);
}
// zigzag_decoding
deltaOfDelta = ZIGZAG_DECODE(int64_t, dd2);
}
ipos += nbytes;
prevDelta += deltaOfDelta;
prevValue += prevDelta;
ostream[opos++] = prevValue;
if (opos == nelements) {
return nelements * longBytes;
}
}
if (remainder > 0) {
}
#endif
return 0;
}