From 34048937a4d908428b23e3b72d79093b5e81b8c0 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 23 Apr 2024 10:03:01 +0800 Subject: [PATCH 1/4] fix(stream): check req id to remove expired check rsp . --- source/libs/stream/src/streamStart.c | 18 +++++++++++++----- source/libs/stream/src/streamTask.c | 13 ++++++++++--- 2 files changed, 23 insertions(+), 8 deletions(-) diff --git a/source/libs/stream/src/streamStart.c b/source/libs/stream/src/streamStart.c index cc1987492c..229648cf3c 100644 --- a/source/libs/stream/src/streamStart.c +++ b/source/libs/stream/src/streamStart.c @@ -195,7 +195,7 @@ void streamTaskCheckDownstream(SStreamTask* pTask) { streamTaskAddReqInfo(&pTask->taskCheckInfo, req.reqId, req.downstreamTaskId, pTask->id.idStr); stDebug("s-task:%s (vgId:%d) stage:%" PRId64 " check single downstream task:0x%x(vgId:%d) ver:%" PRId64 "-%" PRId64 - " window:%" PRId64 "-%" PRId64 " req:0x%" PRIx64, + " window:%" PRId64 "-%" PRId64 " reqId:0x%" PRIx64, pTask->id.idStr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, pRange->range.minVer, pRange->range.maxVer, pWindow->skey, pWindow->ekey, req.reqId); @@ -218,8 +218,9 @@ void streamTaskCheckDownstream(SStreamTask* pTask) { streamTaskAddReqInfo(&pTask->taskCheckInfo, req.reqId, req.downstreamTaskId, pTask->id.idStr); - stDebug("s-task:%s (vgId:%d) stage:%" PRId64 " check downstream task:0x%x (vgId:%d) (shuffle), idx:%d", - pTask->id.idStr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, i); + stDebug("s-task:%s (vgId:%d) stage:%" PRId64 + " check downstream task:0x%x (vgId:%d) (shuffle), idx:%d, reqId:%" PRIx64, + pTask->id.idStr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, i, req.reqId); streamSendCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet); } } else { // for sink task, set it ready directly. @@ -395,7 +396,10 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs } if (pRsp->status == TASK_DOWNSTREAM_READY) { - streamTaskUpdateCheckInfo(pInfo, pRsp->downstreamTaskId, pRsp->status, now, pRsp->reqId, &left, id); + int32_t code = streamTaskUpdateCheckInfo(pInfo, pRsp->downstreamTaskId, pRsp->status, now, pRsp->reqId, &left, id); + if (code != TSDB_CODE_SUCCESS) { + return TSDB_CODE_SUCCESS; + } if (left == 0) { doProcessDownstreamReadyRsp(pTask); // all downstream tasks are ready, set the complete check downstream flag @@ -405,7 +409,11 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs pRsp->upstreamNodeId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->status, total, left); } } else { // not ready, wait for 100ms and retry - streamTaskUpdateCheckInfo(pInfo, pRsp->downstreamTaskId, pRsp->status, now, pRsp->reqId, &left, id); + int32_t code = streamTaskUpdateCheckInfo(pInfo, pRsp->downstreamTaskId, pRsp->status, now, pRsp->reqId, &left, id); + if (code != TSDB_CODE_SUCCESS) { + return TSDB_CODE_SUCCESS; // return success in any cases. + } + if (pRsp->status == TASK_UPSTREAM_NEW_STAGE || pRsp->status == TASK_DOWNSTREAM_NOT_LEADER) { if (pRsp->status == TASK_UPSTREAM_NEW_STAGE) { stError("s-task:%s vgId:%d self vnode-transfer/leader-change/restart detected, old stage:%" PRId64 diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index b87c19b08e..b40bdb2ced 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -1011,9 +1011,15 @@ int32_t streamTaskUpdateCheckInfo(STaskCheckInfo* pInfo, int32_t taskId, int32_t for(int32_t i = 0; i < taosArrayGetSize(pInfo->pList); ++i) { SDownstreamStatusInfo* p = taosArrayGet(pInfo->pList, i); if (p->taskId == taskId) { - ASSERT(reqId == p->reqId); - // count down one, since it is ready now + if (reqId != p->reqId) { + stError("s-task:%s reqId:%" PRIx64 " expected:%" PRIx64 + " expired check-rsp recv from downstream task:0x%x, discarded", + id, reqId, p->reqId, taskId); + return TSDB_CODE_FAILED; + } + + // subtract one not-ready-task, since it is ready now if ((p->status != TASK_DOWNSTREAM_READY) && (status == TASK_DOWNSTREAM_READY)) { *pNotReady = atomic_sub_fetch_32(&pInfo->notReadyTasks, 1); } else { @@ -1029,7 +1035,8 @@ int32_t streamTaskUpdateCheckInfo(STaskCheckInfo* pInfo, int32_t taskId, int32_t } taosThreadMutexUnlock(&pInfo->checkInfoLock); - stError("s-task:%s unexpected check rsp msg, downstream task:0x%x, reqId:%"PRIx64, id, taskId, reqId); + stError("s-task:%s unexpected check rsp msg, invalid downstream task:0x%x, reqId:%" PRIx64 " discarded", id, taskId, + reqId); return TSDB_CODE_FAILED; } From 1adf59e7d8bbced21695e436ce5627f812acbfb9 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 23 Apr 2024 13:53:23 +0800 Subject: [PATCH 2/4] fix(stream): do check when task is paused, and do some internal refactor. --- include/libs/stream/tstream.h | 2 +- source/dnode/vnode/src/tqCommon/tqCommon.c | 4 +- source/dnode/vnode/src/tsdb/tsdbRead2.c | 5 - source/libs/function/src/builtinsimpl.c | 2 +- source/libs/stream/src/streamTask.c | 147 +++++++++++---------- 5 files changed, 82 insertions(+), 78 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 64adf53bc8..1b30fdcb01 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -832,7 +832,7 @@ bool streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t latestVer) int32_t streamQueueGetNumOfItems(const SStreamQueue* pQueue); // common -void streamTaskPause(SStreamMeta* pMeta, SStreamTask* pTask); +void streamTaskPause(SStreamTask* pTask); void streamTaskResume(SStreamTask* pTask); int32_t streamTaskStop(SStreamTask* pTask); int32_t streamTaskSetUpstreamInfo(SStreamTask* pTask, const SStreamTask* pUpstreamTask); diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index 7deebf3b0f..2352d3a555 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -989,7 +989,7 @@ int32_t tqStreamTaskProcessTaskPauseReq(SStreamMeta* pMeta, char* pMsg) { } tqDebug("s-task:%s receive pause msg from mnode", pTask->id.idStr); - streamTaskPause(pMeta, pTask); + streamTaskPause(pTask); SStreamTask* pHistoryTask = NULL; if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { @@ -1006,7 +1006,7 @@ int32_t tqStreamTaskProcessTaskPauseReq(SStreamMeta* pMeta, char* pMsg) { tqDebug("s-task:%s fill-history task handle paused along with related stream task", pHistoryTask->id.idStr); - streamTaskPause(pMeta, pHistoryTask); + streamTaskPause(pHistoryTask); streamMetaReleaseTask(pMeta, pHistoryTask); } diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index 1ea037b4e9..93099a9aa7 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -1610,11 +1610,6 @@ static bool tryCopyDistinctRowFromSttBlock(TSDBROW* fRow, SSttBlockReader* pSttB doUnpinSttBlock(pSttBlockReader); if (hasVal) { SRowKey* pNext = getCurrentKeyInSttBlock(pSttBlockReader); - - if (IS_VAR_DATA_TYPE(pSttKey->pks[0].type)) { - tsdbInfo("current pk:%s, next pk:%s", pSttKey->pks[0].pData, pNext->pks[0].pData); - } - if (pkCompEx(pSttKey, pNext) != 0) { code = doAppendRowFromFileBlock(pReader->resBlockInfo.pResBlock, pReader, fRow->pBlockData, fRow->iRow); *copied = (code == TSDB_CODE_SUCCESS); diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 56f2ccd630..cd1a5f1122 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -2441,7 +2441,7 @@ static void prepareBuf(SqlFunctionCtx* pCtx) { int32_t rowLen = 0; for (int32_t j = 0; j < pCtx->subsidiaries.num; ++j) { SqlFunctionCtx* pc = pCtx->subsidiaries.pCtx[j]; - rowLen += pc->pExpr->base.resSchema.bytes; + rowLen += pc->resDataInfo.interBufSize; } pCtx->subsidiaries.rowLen = rowLen + pCtx->subsidiaries.num * sizeof(bool); diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index b40bdb2ced..1ae5d54b1d 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -912,7 +912,7 @@ static int32_t taskPauseCallback(SStreamTask* pTask, void* param) { return TSDB_CODE_SUCCESS; } -void streamTaskPause(SStreamMeta* pMeta, SStreamTask* pTask) { +void streamTaskPause(SStreamTask* pTask) { streamTaskHandleEventAsync(pTask->status.pSM, TASK_EVENT_PAUSE, taskPauseCallback, NULL); } @@ -983,19 +983,27 @@ static int32_t streamTaskInitTaskCheckInfo(STaskCheckInfo* pInfo, STaskOutputInf return TSDB_CODE_SUCCESS; } +static SDownstreamStatusInfo* findCheckRspStatus(STaskCheckInfo* pInfo, int32_t taskId) { + for (int32_t j = 0; j < taosArrayGetSize(pInfo->pList); ++j) { + SDownstreamStatusInfo* p = taosArrayGet(pInfo->pList, j); + if (p->taskId == taskId) { + return p; + } + } + + return NULL; +} + int32_t streamTaskAddReqInfo(STaskCheckInfo* pInfo, int64_t reqId, int32_t taskId, const char* id) { SDownstreamStatusInfo info = {.taskId = taskId, .status = -1, .reqId = reqId, .rspTs = 0}; taosThreadMutexLock(&pInfo->checkInfoLock); - for(int32_t i = 0; i < taosArrayGetSize(pInfo->pList); ++i) { - SDownstreamStatusInfo* p = taosArrayGet(pInfo->pList, i); - if (p->taskId == taskId) { - stDebug("s-task:%s check info to task:0x%x already sent", id, taskId); - - taosThreadMutexUnlock(&pInfo->checkInfoLock); - return TSDB_CODE_SUCCESS; - } + SDownstreamStatusInfo* p = findCheckRspStatus(pInfo, taskId); + if (p != NULL) { + stDebug("s-task:%s check info to task:0x%x already sent", id, taskId); + taosThreadMutexUnlock(&pInfo->checkInfoLock); + return TSDB_CODE_SUCCESS; } taosArrayPush(pInfo->pList, &info); @@ -1008,30 +1016,29 @@ int32_t streamTaskUpdateCheckInfo(STaskCheckInfo* pInfo, int32_t taskId, int32_t int32_t* pNotReady, const char* id) { taosThreadMutexLock(&pInfo->checkInfoLock); - for(int32_t i = 0; i < taosArrayGetSize(pInfo->pList); ++i) { - SDownstreamStatusInfo* p = taosArrayGet(pInfo->pList, i); - if (p->taskId == taskId) { - - if (reqId != p->reqId) { - stError("s-task:%s reqId:%" PRIx64 " expected:%" PRIx64 - " expired check-rsp recv from downstream task:0x%x, discarded", - id, reqId, p->reqId, taskId); - return TSDB_CODE_FAILED; - } - - // subtract one not-ready-task, since it is ready now - if ((p->status != TASK_DOWNSTREAM_READY) && (status == TASK_DOWNSTREAM_READY)) { - *pNotReady = atomic_sub_fetch_32(&pInfo->notReadyTasks, 1); - } else { - *pNotReady = pInfo->notReadyTasks; - } - - p->status = status; - p->rspTs = rspTs; + SDownstreamStatusInfo* p = findCheckRspStatus(pInfo, taskId); + if (p != NULL) { + if (reqId != p->reqId) { + stError("s-task:%s reqId:%" PRIx64 " expected:%" PRIx64 + " expired check-rsp recv from downstream task:0x%x, discarded", + id, reqId, p->reqId, taskId); taosThreadMutexUnlock(&pInfo->checkInfoLock); - return TSDB_CODE_SUCCESS; + return TSDB_CODE_FAILED; } + + // subtract one not-ready-task, since it is ready now + if ((p->status != TASK_DOWNSTREAM_READY) && (status == TASK_DOWNSTREAM_READY)) { + *pNotReady = atomic_sub_fetch_32(&pInfo->notReadyTasks, 1); + } else { + *pNotReady = pInfo->notReadyTasks; + } + + p->status = status; + p->rspTs = rspTs; + + taosThreadMutexUnlock(&pInfo->checkInfoLock); + return TSDB_CODE_SUCCESS; } taosThreadMutexUnlock(&pInfo->checkInfoLock); @@ -1111,6 +1118,31 @@ static void doSendCheckMsg(SStreamTask* pTask, SDownstreamStatusInfo* p) { } } +static void getCheckRspStatus(STaskCheckInfo* pInfo, int64_t el, int32_t* numOfReady, int32_t* numOfFault, + int32_t* numOfNotRsp, SArray* pTimeoutList, SArray* pNotReadyList, const char* id) { + for (int32_t i = 0; i < taosArrayGetSize(pInfo->pList); ++i) { + SDownstreamStatusInfo* p = taosArrayGet(pInfo->pList, i); + if (p->status == TASK_DOWNSTREAM_READY) { + (*numOfReady) += 1; + } else if (p->status == TASK_UPSTREAM_NEW_STAGE || p->status == TASK_DOWNSTREAM_NOT_LEADER) { + stDebug("s-task:%s recv status:NEW_STAGE/NOT_LEADER from downstream, task:0x%x, quit from check downstream", id, + p->taskId); + (*numOfFault) += 1; + } else { // TASK_DOWNSTREAM_NOT_READY + if (p->rspTs == 0) { // not response yet + ASSERT(p->status == -1); + if (el >= CHECK_NOT_RSP_DURATION) { // not receive info for 10 sec. + taosArrayPush(pTimeoutList, &p->taskId); + } else { // el < CHECK_NOT_RSP_DURATION + (*numOfNotRsp) += 1; // do nothing and continue waiting for their rsp + } + } else { + taosArrayPush(pNotReadyList, &p->taskId); + } + } + } +} + static void rspMonitorFn(void* param, void* tmrId) { SStreamTask* pTask = param; SStreamTaskState* pStat = streamTaskGetStatus(pTask); @@ -1140,14 +1172,13 @@ static void rspMonitorFn(void* param, void* tmrId) { return; } - if (state == TASK_STATUS__DROPPING || state == TASK_STATUS__READY) { + if (state == TASK_STATUS__DROPPING || state == TASK_STATUS__READY || state == TASK_STATUS__PAUSE) { int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); stDebug("s-task:%s status:%s vgId:%d quit from monitor check-rsp tmr, ref:%d", id, pStat->name, vgId, ref); taosThreadMutexLock(&pInfo->checkInfoLock); streamTaskCompleteCheckRsp(pInfo, id); taosThreadMutexUnlock(&pInfo->checkInfoLock); - return; } @@ -1166,27 +1197,7 @@ static void rspMonitorFn(void* param, void* tmrId) { SArray* pTimeoutList = taosArrayInit(4, sizeof(int64_t)); if (pStat->state == TASK_STATUS__UNINIT) { - for (int32_t i = 0; i < taosArrayGetSize(pInfo->pList); ++i) { - SDownstreamStatusInfo* p = taosArrayGet(pInfo->pList, i); - if (p->status == TASK_DOWNSTREAM_READY) { - numOfReady += 1; - } else if (p->status == TASK_UPSTREAM_NEW_STAGE || p->status == TASK_DOWNSTREAM_NOT_LEADER) { - stDebug("s-task:%s recv status:NEW_STAGE/NOT_LEADER from downstream, task:0x%x, quit from check downstream", id, - p->taskId); - numOfFault += 1; - } else { // TASK_DOWNSTREAM_NOT_READY - if (p->rspTs == 0) { // not response yet - ASSERT(p->status == -1); - if (el >= CHECK_NOT_RSP_DURATION) { // not receive info for 10 sec. - taosArrayPush(pTimeoutList, &p->taskId); - } else { // el < CHECK_NOT_RSP_DURATION - numOfNotRsp += 1; // do nothing and continue waiting for their rsp - } - } else { - taosArrayPush(pNotReadyList, &p->taskId); - } - } - } + getCheckRspStatus(pInfo, el, &numOfReady, &numOfFault, &numOfNotRsp, pTimeoutList, pNotReadyList, id); } else { // unexpected status stError("s-task:%s unexpected task status:%s during waiting for check rsp", id, pStat->name); } @@ -1203,11 +1214,11 @@ static void rspMonitorFn(void* param, void* tmrId) { "detected, notRsp:%d, notReady:%d, fault:%d, timeout:%d, ready:%d ref:%d", id, pStat->name, vgId, numOfNotRsp, numOfNotReady, numOfFault, numOfTimeout, numOfReady, ref); + streamTaskCompleteCheckRsp(pInfo, id); taosThreadMutexUnlock(&pInfo->checkInfoLock); + taosArrayDestroy(pNotReadyList); taosArrayDestroy(pTimeoutList); - - streamTaskCompleteCheckRsp(pInfo, id); return; } @@ -1228,6 +1239,9 @@ static void rspMonitorFn(void* param, void* tmrId) { STaskId* pHId = &pTask->hTaskInfo.id; streamMetaAddTaskLaunchResult(pTask->pMeta, pHId->streamId, pHId->taskId, pInfo->startTs, now, false); } + + taosArrayDestroy(pNotReadyList); + taosArrayDestroy(pTimeoutList); return; } @@ -1238,13 +1252,11 @@ static void rspMonitorFn(void* param, void* tmrId) { for (int32_t i = 0; i < numOfNotReady; ++i) { int32_t taskId = *(int32_t*)taosArrayGet(pNotReadyList, i); - for (int32_t j = 0; j < taosArrayGetSize(pInfo->pList); ++j) { - SDownstreamStatusInfo* p = taosArrayGet(pInfo->pList, j); - if (p->taskId == taskId) { - p->rspTs = 0; - p->status = -1; - doSendCheckMsg(pTask, p); - } + SDownstreamStatusInfo* p = findCheckRspStatus(pInfo, taskId); + if (p != NULL) { + p->rspTs = 0; + p->status = -1; + doSendCheckMsg(pTask, p); } } @@ -1258,13 +1270,10 @@ static void rspMonitorFn(void* param, void* tmrId) { for (int32_t i = 0; i < numOfTimeout; ++i) { int32_t taskId = *(int32_t*)taosArrayGet(pTimeoutList, i); - for (int32_t j = 0; j < taosArrayGetSize(pInfo->pList); ++j) { - SDownstreamStatusInfo* p = taosArrayGet(pInfo->pList, j); - if (p->taskId == taskId) { - ASSERT(p->status == -1 && p->rspTs == 0); - doSendCheckMsg(pTask, p); - break; - } + SDownstreamStatusInfo* p = findCheckRspStatus(pInfo, taskId); + if (p != NULL) { + ASSERT(p->status == -1 && p->rspTs == 0); + doSendCheckMsg(pTask, p); } } From b0f68a914736d6dd1e3fdb30f14fe590bb0e73b9 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 23 Apr 2024 14:58:51 +0800 Subject: [PATCH 3/4] refactor: do some internal refactor. --- source/dnode/vnode/src/tsdb/tsdbCache.c | 8 ++++---- source/libs/executor/src/cachescanoperator.c | 21 ++++++++++---------- source/libs/function/src/builtinsimpl.c | 2 +- 3 files changed, 16 insertions(+), 15 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index 99f35717fe..76f98b33c1 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -1637,8 +1637,8 @@ int32_t tsdbCacheGetBatch(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCache SLastCol *pLastCol = (SLastCol *)taosLRUCacheValue(pCache, h); SLastCol lastCol = *pLastCol; - for (int8_t i = 0; i < lastCol.rowKey.numOfPKs; i++) { - reallocVarDataVal(&lastCol.rowKey.pks[i]); + for (int8_t j = 0; j < lastCol.rowKey.numOfPKs; j++) { + reallocVarDataVal(&lastCol.rowKey.pks[j]); } reallocVarData(&lastCol.colVal); taosArrayPush(pLastArray, &lastCol); @@ -1667,8 +1667,8 @@ int32_t tsdbCacheGetBatch(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCache SLastCol *pLastCol = (SLastCol *)taosLRUCacheValue(pCache, h); SLastCol lastCol = *pLastCol; - for (int8_t i = 0; i < lastCol.rowKey.numOfPKs; i++) { - reallocVarDataVal(&lastCol.rowKey.pks[i]); + for (int8_t j = 0; j < lastCol.rowKey.numOfPKs; j++) { + reallocVarDataVal(&lastCol.rowKey.pks[j]); } reallocVarData(&lastCol.colVal); taosArraySet(pLastArray, idxKey->idx, &lastCol); diff --git a/source/libs/executor/src/cachescanoperator.c b/source/libs/executor/src/cachescanoperator.c index 421d153230..b7159225e1 100644 --- a/source/libs/executor/src/cachescanoperator.c +++ b/source/libs/executor/src/cachescanoperator.c @@ -217,6 +217,7 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) { SCacheRowsScanInfo* pInfo = pOperator->info; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; STableListInfo* pTableList = pInfo->pTableList; + SStoreCacheReader* pReaderFn = &pInfo->readHandle.api.cacheFn; uint64_t suid = tableListGetSuid(pTableList); int32_t size = tableListGetSize(pTableList); @@ -237,8 +238,8 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) { blockDataCleanup(pInfo->pBufferedRes); taosArrayClear(pInfo->pUidList); - int32_t code = pInfo->readHandle.api.cacheFn.retrieveRows(pInfo->pLastrowReader, pInfo->pBufferedRes, - pInfo->pSlotIds, pInfo->pDstSlotIds, pInfo->pUidList); + int32_t code = pReaderFn->retrieveRows(pInfo->pLastrowReader, pInfo->pBufferedRes, pInfo->pSlotIds, + pInfo->pDstSlotIds, pInfo->pUidList); if (code != TSDB_CODE_SUCCESS) { T_LONG_JMP(pTaskInfo->env, code); } @@ -307,10 +308,10 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) { } if (NULL == pInfo->pLastrowReader) { - code = pInfo->readHandle.api.cacheFn.openReader( - pInfo->readHandle.vnode, pInfo->retrieveType, pList, num, taosArrayGetSize(pInfo->matchInfo.pList), - pInfo->pCidList, pInfo->pSlotIds, suid, &pInfo->pLastrowReader, pTaskInfo->id.str, pInfo->pFuncTypeList, - &pInfo->pkCol, pInfo->numOfPks); + code = pReaderFn->openReader(pInfo->readHandle.vnode, pInfo->retrieveType, pList, num, + taosArrayGetSize(pInfo->matchInfo.pList), pInfo->pCidList, pInfo->pSlotIds, suid, + &pInfo->pLastrowReader, pTaskInfo->id.str, pInfo->pFuncTypeList, &pInfo->pkCol, + pInfo->numOfPks); if (code != TSDB_CODE_SUCCESS) { pInfo->currentGroupIndex += 1; @@ -318,13 +319,13 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) { continue; } } else { - pInfo->readHandle.api.cacheFn.reuseReader(pInfo->pLastrowReader, pList, num); + pReaderFn->reuseReader(pInfo->pLastrowReader, pList, num); } taosArrayClear(pInfo->pUidList); - code = pInfo->readHandle.api.cacheFn.retrieveRows(pInfo->pLastrowReader, pInfo->pRes, pInfo->pSlotIds, pInfo->pDstSlotIds, - pInfo->pUidList); + code = pReaderFn->retrieveRows(pInfo->pLastrowReader, pInfo->pRes, pInfo->pSlotIds, pInfo->pDstSlotIds, + pInfo->pUidList); if (code != TSDB_CODE_SUCCESS) { T_LONG_JMP(pTaskInfo->env, code); } @@ -357,7 +358,7 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) { } } - pInfo->pLastrowReader = pInfo->readHandle.api.cacheFn.closeReader(pInfo->pLastrowReader); + pInfo->pLastrowReader = pReaderFn->closeReader(pInfo->pLastrowReader); setOperatorCompleted(pOperator); return NULL; } diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index cd1a5f1122..56f2ccd630 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -2441,7 +2441,7 @@ static void prepareBuf(SqlFunctionCtx* pCtx) { int32_t rowLen = 0; for (int32_t j = 0; j < pCtx->subsidiaries.num; ++j) { SqlFunctionCtx* pc = pCtx->subsidiaries.pCtx[j]; - rowLen += pc->resDataInfo.interBufSize; + rowLen += pc->pExpr->base.resSchema.bytes; } pCtx->subsidiaries.rowLen = rowLen + pCtx->subsidiaries.num * sizeof(bool); From 10c792efa2203ffaa7d836e0c9ad218c67e92820 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 23 Apr 2024 16:55:47 +0800 Subject: [PATCH 4/4] fix(stream):add related fill-history task to be done, when check-rsp moniting quit. --- source/libs/stream/src/streamStart.c | 2 +- source/libs/stream/src/streamTask.c | 16 +++++++++++----- 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/source/libs/stream/src/streamStart.c b/source/libs/stream/src/streamStart.c index 229648cf3c..0b91359f48 100644 --- a/source/libs/stream/src/streamStart.c +++ b/source/libs/stream/src/streamStart.c @@ -219,7 +219,7 @@ void streamTaskCheckDownstream(SStreamTask* pTask) { streamTaskAddReqInfo(&pTask->taskCheckInfo, req.reqId, req.downstreamTaskId, pTask->id.idStr); stDebug("s-task:%s (vgId:%d) stage:%" PRId64 - " check downstream task:0x%x (vgId:%d) (shuffle), idx:%d, reqId:%" PRIx64, + " check downstream task:0x%x (vgId:%d) (shuffle), idx:%d, reqId:0x%" PRIx64, pTask->id.idStr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, i, req.reqId); streamSendCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet); } diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 1ae5d54b1d..2bc09c1c22 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -1091,7 +1091,7 @@ static void doSendCheckMsg(SStreamTask* pTask, SDownstreamStatusInfo* p) { req.reqId = p->reqId; req.downstreamNodeId = pOutputInfo->fixedDispatcher.nodeId; req.downstreamTaskId = pOutputInfo->fixedDispatcher.taskId; - stDebug("s-task:%s (vgId:%d) stage:%" PRId64 " re-send check downstream task:0x%x(vgId:%d) req:0x%" PRIx64, + stDebug("s-task:%s (vgId:%d) stage:%" PRId64 " re-send check downstream task:0x%x(vgId:%d) reqId:0x%" PRIx64, pTask->id.idStr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, req.reqId); streamSendCheckMsg(pTask, &req, pOutputInfo->fixedDispatcher.nodeId, &pOutputInfo->fixedDispatcher.epSet); @@ -1107,8 +1107,10 @@ static void doSendCheckMsg(SStreamTask* pTask, SDownstreamStatusInfo* p) { req.downstreamNodeId = pVgInfo->vgId; req.downstreamTaskId = pVgInfo->taskId; - stDebug("s-task:%s (vgId:%d) stage:%" PRId64 " re-send check downstream task:0x%x(vgId:%d) (shuffle), idx:%d", - pTask->id.idStr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, i); + stDebug("s-task:%s (vgId:%d) stage:%" PRId64 + " re-send check downstream task:0x%x(vgId:%d) (shuffle), idx:%d reqId:0x%" PRIx64, + pTask->id.idStr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, i, + p->reqId); streamSendCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet); break; } @@ -1158,7 +1160,7 @@ static void rspMonitorFn(void* param, void* tmrId) { int32_t numOfNotReady = 0; int32_t numOfTimeout = 0; - stDebug("s-task:%s start to do check downstream rsp check", id); + stDebug("s-task:%s start to do check-downstream-rsp check in tmr", id); if (state == TASK_STATUS__STOP) { int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); @@ -1169,6 +1171,10 @@ static void rspMonitorFn(void* param, void* tmrId) { taosThreadMutexUnlock(&pInfo->checkInfoLock); streamMetaAddTaskLaunchResult(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, pInfo->startTs, now, false); + if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { + STaskId* pHId = &pTask->hTaskInfo.id; + streamMetaAddTaskLaunchResult(pTask->pMeta, pHId->streamId, pHId->taskId, pInfo->startTs, now, false); + } return; } @@ -1207,7 +1213,7 @@ static void rspMonitorFn(void* param, void* tmrId) { // fault tasks detected, not try anymore ASSERT((numOfReady + numOfFault + numOfNotReady + numOfTimeout + numOfNotRsp) == taosArrayGetSize(pInfo->pList)); - if ((numOfNotRsp == 0) && (numOfFault > 0)) { + if (numOfFault > 0) { int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); stDebug( "s-task:%s status:%s vgId:%d all rsp. quit from monitor rsp tmr, since vnode-transfer/leader-change/restart "