diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 64adf53bc8..1b30fdcb01 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -832,7 +832,7 @@ bool streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t latestVer) int32_t streamQueueGetNumOfItems(const SStreamQueue* pQueue); // common -void streamTaskPause(SStreamMeta* pMeta, SStreamTask* pTask); +void streamTaskPause(SStreamTask* pTask); void streamTaskResume(SStreamTask* pTask); int32_t streamTaskStop(SStreamTask* pTask); int32_t streamTaskSetUpstreamInfo(SStreamTask* pTask, const SStreamTask* pUpstreamTask); diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index 7deebf3b0f..2352d3a555 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -989,7 +989,7 @@ int32_t tqStreamTaskProcessTaskPauseReq(SStreamMeta* pMeta, char* pMsg) { } tqDebug("s-task:%s receive pause msg from mnode", pTask->id.idStr); - streamTaskPause(pMeta, pTask); + streamTaskPause(pTask); SStreamTask* pHistoryTask = NULL; if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { @@ -1006,7 +1006,7 @@ int32_t tqStreamTaskProcessTaskPauseReq(SStreamMeta* pMeta, char* pMsg) { tqDebug("s-task:%s fill-history task handle paused along with related stream task", pHistoryTask->id.idStr); - streamTaskPause(pMeta, pHistoryTask); + streamTaskPause(pHistoryTask); streamMetaReleaseTask(pMeta, pHistoryTask); } diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index 1ea037b4e9..93099a9aa7 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -1610,11 +1610,6 @@ static bool tryCopyDistinctRowFromSttBlock(TSDBROW* fRow, SSttBlockReader* pSttB doUnpinSttBlock(pSttBlockReader); if (hasVal) { SRowKey* pNext = getCurrentKeyInSttBlock(pSttBlockReader); - - if (IS_VAR_DATA_TYPE(pSttKey->pks[0].type)) { - tsdbInfo("current pk:%s, next pk:%s", pSttKey->pks[0].pData, pNext->pks[0].pData); - } - if (pkCompEx(pSttKey, pNext) != 0) { code = doAppendRowFromFileBlock(pReader->resBlockInfo.pResBlock, pReader, fRow->pBlockData, fRow->iRow); *copied = (code == TSDB_CODE_SUCCESS); diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 56f2ccd630..cd1a5f1122 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -2441,7 +2441,7 @@ static void prepareBuf(SqlFunctionCtx* pCtx) { int32_t rowLen = 0; for (int32_t j = 0; j < pCtx->subsidiaries.num; ++j) { SqlFunctionCtx* pc = pCtx->subsidiaries.pCtx[j]; - rowLen += pc->pExpr->base.resSchema.bytes; + rowLen += pc->resDataInfo.interBufSize; } pCtx->subsidiaries.rowLen = rowLen + pCtx->subsidiaries.num * sizeof(bool); diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index b40bdb2ced..1ae5d54b1d 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -912,7 +912,7 @@ static int32_t taskPauseCallback(SStreamTask* pTask, void* param) { return TSDB_CODE_SUCCESS; } -void streamTaskPause(SStreamMeta* pMeta, SStreamTask* pTask) { +void streamTaskPause(SStreamTask* pTask) { streamTaskHandleEventAsync(pTask->status.pSM, TASK_EVENT_PAUSE, taskPauseCallback, NULL); } @@ -983,19 +983,27 @@ static int32_t streamTaskInitTaskCheckInfo(STaskCheckInfo* pInfo, STaskOutputInf return TSDB_CODE_SUCCESS; } +static SDownstreamStatusInfo* findCheckRspStatus(STaskCheckInfo* pInfo, int32_t taskId) { + for (int32_t j = 0; j < taosArrayGetSize(pInfo->pList); ++j) { + SDownstreamStatusInfo* p = taosArrayGet(pInfo->pList, j); + if (p->taskId == taskId) { + return p; + } + } + + return NULL; +} + int32_t streamTaskAddReqInfo(STaskCheckInfo* pInfo, int64_t reqId, int32_t taskId, const char* id) { SDownstreamStatusInfo info = {.taskId = taskId, .status = -1, .reqId = reqId, .rspTs = 0}; taosThreadMutexLock(&pInfo->checkInfoLock); - for(int32_t i = 0; i < taosArrayGetSize(pInfo->pList); ++i) { - SDownstreamStatusInfo* p = taosArrayGet(pInfo->pList, i); - if (p->taskId == taskId) { - stDebug("s-task:%s check info to task:0x%x already sent", id, taskId); - - taosThreadMutexUnlock(&pInfo->checkInfoLock); - return TSDB_CODE_SUCCESS; - } + SDownstreamStatusInfo* p = findCheckRspStatus(pInfo, taskId); + if (p != NULL) { + stDebug("s-task:%s check info to task:0x%x already sent", id, taskId); + taosThreadMutexUnlock(&pInfo->checkInfoLock); + return TSDB_CODE_SUCCESS; } taosArrayPush(pInfo->pList, &info); @@ -1008,30 +1016,29 @@ int32_t streamTaskUpdateCheckInfo(STaskCheckInfo* pInfo, int32_t taskId, int32_t int32_t* pNotReady, const char* id) { taosThreadMutexLock(&pInfo->checkInfoLock); - for(int32_t i = 0; i < taosArrayGetSize(pInfo->pList); ++i) { - SDownstreamStatusInfo* p = taosArrayGet(pInfo->pList, i); - if (p->taskId == taskId) { - - if (reqId != p->reqId) { - stError("s-task:%s reqId:%" PRIx64 " expected:%" PRIx64 - " expired check-rsp recv from downstream task:0x%x, discarded", - id, reqId, p->reqId, taskId); - return TSDB_CODE_FAILED; - } - - // subtract one not-ready-task, since it is ready now - if ((p->status != TASK_DOWNSTREAM_READY) && (status == TASK_DOWNSTREAM_READY)) { - *pNotReady = atomic_sub_fetch_32(&pInfo->notReadyTasks, 1); - } else { - *pNotReady = pInfo->notReadyTasks; - } - - p->status = status; - p->rspTs = rspTs; + SDownstreamStatusInfo* p = findCheckRspStatus(pInfo, taskId); + if (p != NULL) { + if (reqId != p->reqId) { + stError("s-task:%s reqId:%" PRIx64 " expected:%" PRIx64 + " expired check-rsp recv from downstream task:0x%x, discarded", + id, reqId, p->reqId, taskId); taosThreadMutexUnlock(&pInfo->checkInfoLock); - return TSDB_CODE_SUCCESS; + return TSDB_CODE_FAILED; } + + // subtract one not-ready-task, since it is ready now + if ((p->status != TASK_DOWNSTREAM_READY) && (status == TASK_DOWNSTREAM_READY)) { + *pNotReady = atomic_sub_fetch_32(&pInfo->notReadyTasks, 1); + } else { + *pNotReady = pInfo->notReadyTasks; + } + + p->status = status; + p->rspTs = rspTs; + + taosThreadMutexUnlock(&pInfo->checkInfoLock); + return TSDB_CODE_SUCCESS; } taosThreadMutexUnlock(&pInfo->checkInfoLock); @@ -1111,6 +1118,31 @@ static void doSendCheckMsg(SStreamTask* pTask, SDownstreamStatusInfo* p) { } } +static void getCheckRspStatus(STaskCheckInfo* pInfo, int64_t el, int32_t* numOfReady, int32_t* numOfFault, + int32_t* numOfNotRsp, SArray* pTimeoutList, SArray* pNotReadyList, const char* id) { + for (int32_t i = 0; i < taosArrayGetSize(pInfo->pList); ++i) { + SDownstreamStatusInfo* p = taosArrayGet(pInfo->pList, i); + if (p->status == TASK_DOWNSTREAM_READY) { + (*numOfReady) += 1; + } else if (p->status == TASK_UPSTREAM_NEW_STAGE || p->status == TASK_DOWNSTREAM_NOT_LEADER) { + stDebug("s-task:%s recv status:NEW_STAGE/NOT_LEADER from downstream, task:0x%x, quit from check downstream", id, + p->taskId); + (*numOfFault) += 1; + } else { // TASK_DOWNSTREAM_NOT_READY + if (p->rspTs == 0) { // not response yet + ASSERT(p->status == -1); + if (el >= CHECK_NOT_RSP_DURATION) { // not receive info for 10 sec. + taosArrayPush(pTimeoutList, &p->taskId); + } else { // el < CHECK_NOT_RSP_DURATION + (*numOfNotRsp) += 1; // do nothing and continue waiting for their rsp + } + } else { + taosArrayPush(pNotReadyList, &p->taskId); + } + } + } +} + static void rspMonitorFn(void* param, void* tmrId) { SStreamTask* pTask = param; SStreamTaskState* pStat = streamTaskGetStatus(pTask); @@ -1140,14 +1172,13 @@ static void rspMonitorFn(void* param, void* tmrId) { return; } - if (state == TASK_STATUS__DROPPING || state == TASK_STATUS__READY) { + if (state == TASK_STATUS__DROPPING || state == TASK_STATUS__READY || state == TASK_STATUS__PAUSE) { int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); stDebug("s-task:%s status:%s vgId:%d quit from monitor check-rsp tmr, ref:%d", id, pStat->name, vgId, ref); taosThreadMutexLock(&pInfo->checkInfoLock); streamTaskCompleteCheckRsp(pInfo, id); taosThreadMutexUnlock(&pInfo->checkInfoLock); - return; } @@ -1166,27 +1197,7 @@ static void rspMonitorFn(void* param, void* tmrId) { SArray* pTimeoutList = taosArrayInit(4, sizeof(int64_t)); if (pStat->state == TASK_STATUS__UNINIT) { - for (int32_t i = 0; i < taosArrayGetSize(pInfo->pList); ++i) { - SDownstreamStatusInfo* p = taosArrayGet(pInfo->pList, i); - if (p->status == TASK_DOWNSTREAM_READY) { - numOfReady += 1; - } else if (p->status == TASK_UPSTREAM_NEW_STAGE || p->status == TASK_DOWNSTREAM_NOT_LEADER) { - stDebug("s-task:%s recv status:NEW_STAGE/NOT_LEADER from downstream, task:0x%x, quit from check downstream", id, - p->taskId); - numOfFault += 1; - } else { // TASK_DOWNSTREAM_NOT_READY - if (p->rspTs == 0) { // not response yet - ASSERT(p->status == -1); - if (el >= CHECK_NOT_RSP_DURATION) { // not receive info for 10 sec. - taosArrayPush(pTimeoutList, &p->taskId); - } else { // el < CHECK_NOT_RSP_DURATION - numOfNotRsp += 1; // do nothing and continue waiting for their rsp - } - } else { - taosArrayPush(pNotReadyList, &p->taskId); - } - } - } + getCheckRspStatus(pInfo, el, &numOfReady, &numOfFault, &numOfNotRsp, pTimeoutList, pNotReadyList, id); } else { // unexpected status stError("s-task:%s unexpected task status:%s during waiting for check rsp", id, pStat->name); } @@ -1203,11 +1214,11 @@ static void rspMonitorFn(void* param, void* tmrId) { "detected, notRsp:%d, notReady:%d, fault:%d, timeout:%d, ready:%d ref:%d", id, pStat->name, vgId, numOfNotRsp, numOfNotReady, numOfFault, numOfTimeout, numOfReady, ref); + streamTaskCompleteCheckRsp(pInfo, id); taosThreadMutexUnlock(&pInfo->checkInfoLock); + taosArrayDestroy(pNotReadyList); taosArrayDestroy(pTimeoutList); - - streamTaskCompleteCheckRsp(pInfo, id); return; } @@ -1228,6 +1239,9 @@ static void rspMonitorFn(void* param, void* tmrId) { STaskId* pHId = &pTask->hTaskInfo.id; streamMetaAddTaskLaunchResult(pTask->pMeta, pHId->streamId, pHId->taskId, pInfo->startTs, now, false); } + + taosArrayDestroy(pNotReadyList); + taosArrayDestroy(pTimeoutList); return; } @@ -1238,13 +1252,11 @@ static void rspMonitorFn(void* param, void* tmrId) { for (int32_t i = 0; i < numOfNotReady; ++i) { int32_t taskId = *(int32_t*)taosArrayGet(pNotReadyList, i); - for (int32_t j = 0; j < taosArrayGetSize(pInfo->pList); ++j) { - SDownstreamStatusInfo* p = taosArrayGet(pInfo->pList, j); - if (p->taskId == taskId) { - p->rspTs = 0; - p->status = -1; - doSendCheckMsg(pTask, p); - } + SDownstreamStatusInfo* p = findCheckRspStatus(pInfo, taskId); + if (p != NULL) { + p->rspTs = 0; + p->status = -1; + doSendCheckMsg(pTask, p); } } @@ -1258,13 +1270,10 @@ static void rspMonitorFn(void* param, void* tmrId) { for (int32_t i = 0; i < numOfTimeout; ++i) { int32_t taskId = *(int32_t*)taosArrayGet(pTimeoutList, i); - for (int32_t j = 0; j < taosArrayGetSize(pInfo->pList); ++j) { - SDownstreamStatusInfo* p = taosArrayGet(pInfo->pList, j); - if (p->taskId == taskId) { - ASSERT(p->status == -1 && p->rspTs == 0); - doSendCheckMsg(pTask, p); - break; - } + SDownstreamStatusInfo* p = findCheckRspStatus(pInfo, taskId); + if (p != NULL) { + ASSERT(p->status == -1 && p->rspTs == 0); + doSendCheckMsg(pTask, p); } }