fix(stream): fix status check for stream-scan-history status when drop stream task msg is missing.

This commit is contained in:
Haojun Liao 2023-11-24 14:44:00 +08:00
parent 2e922fefc6
commit f28cf740b0
3 changed files with 33 additions and 15 deletions

View File

@ -664,6 +664,8 @@ typedef struct STaskStatusEntry {
int32_t relatedHTask; // has related fill-history task
int64_t activeCheckpointId; // current active checkpoint id
bool checkpointFailed; // denote if the checkpoint is failed or not
bool inputQChanging; // inputQ is changing or not
int64_t inputQUnchangeCounter;
double inputQUsed; // in MiB
double inputRate;
double sinkQuota; // existed quota size for sink task

View File

@ -2790,7 +2790,7 @@ static SStreamTask* mndGetStreamTask(STaskId* pId, SStreamObj* pStream) {
static bool needDropRelatedFillhistoryTask(STaskStatusEntry *pTaskEntry, SStreamExecInfo *pExecNode) {
if (pTaskEntry->status == TASK_STATUS__STREAM_SCAN_HISTORY && pTaskEntry->statusLastDuration >= 10) {
if (fabs(pTaskEntry->inputQUsed) <= DBL_EPSILON) {
if (!pTaskEntry->inputQChanging && pTaskEntry->inputQUnchangeCounter > 10) {
int32_t numOfReady = 0;
int32_t numOfTotal = 0;
for (int32_t k = 0; k < taosArrayGetSize(pExecNode->pTaskList); ++k) {
@ -2933,6 +2933,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
bool snodeChanged = false;
for (int32_t i = 0; i < req.numOfTasks; ++i) {
STaskStatusEntry *p = taosArrayGet(req.pTaskStatus, i);
STaskStatusEntry *pTaskEntry = taosHashGet(execInfo.pTaskMap, &p->id, sizeof(p->id));
if (pTaskEntry == NULL) {
mError("s-task:0x%" PRIx64 " not found in mnode task list", p->id.taskId);
@ -2941,8 +2942,22 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
if (pTaskEntry->stage != p->stage && pTaskEntry->stage != -1) {
updateStageInfo(pTaskEntry, p->stage);
if(pTaskEntry->nodeId == SNODE_HANDLE) snodeChanged = true;
if(pTaskEntry->nodeId == SNODE_HANDLE) {
snodeChanged = true;
}
} else {
// task is idle for more than 50 sec.
if (fabs(pTaskEntry->inputQUsed - p->inputQUsed) <= DBL_EPSILON) {
if (!pTaskEntry->inputQChanging) {
pTaskEntry->inputQUnchangeCounter++;
} else {
pTaskEntry->inputQChanging = false;
}
} else {
pTaskEntry->inputQChanging = true;
pTaskEntry->inputQUnchangeCounter = 0;
}
streamTaskStatusCopy(pTaskEntry, p);
if (p->activeCheckpointId != 0) {
if (activeCheckpointId != 0) {

View File

@ -53,11 +53,8 @@ int32_t tsDecompressIntImpl_Hw(const char *const input, const int32_t nelements,
int64_t prevValue = 0;
#if __AVX2__
while (1) {
if (_pos == nelements) break;
uint64_t w = 0;
memcpy(&w, ip, LONG_BYTES);
while (_pos < nelements) {
uint64_t w = *(uint64_t*) ip;
char selector = (char)(w & INT64MASK(4)); // selector = 4
char bit = bit_per_integer[(int32_t)selector]; // bit = 3
@ -261,17 +258,20 @@ int32_t tsDecompressTimestampAvx2(const char* const input, const int32_t nelemen
int64_t deltaOfDelta = 0;
int32_t longBytes = LONG_BYTES;
// _mm_maskz_loadu_epi8
#if __AVX2__
int32_t batch = nelements >> 2;
int32_t remainder = nelements & 0x1;
// _mm_blendv_epi8
int32_t batch = nelements >> 4;
int32_t remainder = nelements & 0x03;
while (1) {
for(int32_t i = 0; i < batch; ++i) {
uint8_t flags = input[ipos++];
// Decode dd1
uint64_t dd1 = 0;
nbytes = flags & INT8MASK(4);
nbytes = flags & INT8MASK(4); // range of nbytes starts from 0 to 7
// __m128i mask = {};//[0], []
if (nbytes == 0) {
deltaOfDelta = 0;
@ -289,10 +289,6 @@ int32_t tsDecompressTimestampAvx2(const char* const input, const int32_t nelemen
prevValue += prevDelta;
ostream[opos++] = prevValue;
if (opos == nelements) {
return nelements * longBytes;
}
// Decode dd2
uint64_t dd2 = 0;
nbytes = (flags >> 4) & INT8MASK(4);
@ -317,6 +313,11 @@ int32_t tsDecompressTimestampAvx2(const char* const input, const int32_t nelemen
return nelements * longBytes;
}
}
if (remainder > 0) {
}
#endif
return 0;
}