diff --git a/source/dnode/mnode/impl/src/mndShow.c b/source/dnode/mnode/impl/src/mndShow.c index dff918f135..5c3167dd79 100644 --- a/source/dnode/mnode/impl/src/mndShow.c +++ b/source/dnode/mnode/impl/src/mndShow.c @@ -310,6 +310,8 @@ static int32_t mndProcessRetrieveSysTableReq(SNodeMsg *pReq) { mError("failed to process show-retrieve req:%p since %s", pShow, terrstr()); return -1; } + + pShow->numOfReads = 0; } ShowRetrieveFp retrieveFp = pMgmt->retrieveFps[pShow->type]; @@ -374,7 +376,7 @@ static int32_t mndProcessRetrieveSysTableReq(SNodeMsg *pReq) { pReq->pRsp = pRsp; pReq->rspLen = size; - if (rowsRead == 0 || rowsToRead == 0 || (rowsRead == rowsToRead && pShow->numOfRows == pShow->numOfReads)) { + if (rowsRead == 0 || rowsToRead == 0 || (rowsRead < rowsToRead)) { pRsp->completed = 1; mDebug("show:0x%" PRIx64 ", retrieve completed", pShow->id); mndReleaseShowObj((SShowObj*) pShow, true); diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index 7799ac7562..02e09252f9 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -1608,7 +1608,6 @@ static int32_t mndRetrieveStb(SNodeMsg *pReq, SShowObj *pShow, char *data, int32 SStbObj *pStb = NULL; int32_t cols = 0; char *pWrite; - char prefix[TSDB_DB_FNAME_LEN] = {0}; SDbObj* pDb = NULL; if (strlen(pShow->db) > 0) { diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 9c0b0802ab..344b4fa655 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -167,7 +167,7 @@ SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) { if (!tdSTSRowIterNext(&iter, pColData->info.colId, pColData->info.type, &sVal)) { break; } - if (colDataAppend(pColData, curRow, sVal.val, sVal.valType == TD_VTYPE_NULL) < 0) { + if (colDataAppend(pColData, curRow, sVal.val, false) < 0) { taosArrayDestroyEx(pArray, (void (*)(void*))tDeleteSSDataBlock); return NULL; } diff --git a/source/dnode/vnode/src/vnd/vnodeQuery.c b/source/dnode/vnode/src/vnd/vnodeQuery.c index 598647f797..d377ec2039 100644 --- a/source/dnode/vnode/src/vnd/vnodeQuery.c +++ b/source/dnode/vnode/src/vnd/vnodeQuery.c @@ -69,9 +69,9 @@ int vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) { return tqProcessPollReq(pVnode->pTq, pMsg); case TDMT_VND_TASK_PIPE_EXEC: case TDMT_VND_TASK_MERGE_EXEC: - return tqProcessTaskExec(pVnode->pTq, msgstr, msgLen, pInfo->workerId); + return tqProcessTaskExec(pVnode->pTq, msgstr, msgLen, 0); case TDMT_VND_STREAM_TRIGGER: - return tqProcessStreamTrigger(pVnode->pTq, pMsg->pCont, pMsg->contLen, pInfo->workerId); + return tqProcessStreamTrigger(pVnode->pTq, pMsg->pCont, pMsg->contLen, 0); case TDMT_VND_QUERY_HEARTBEAT: return qWorkerProcessHbMsg(pVnode, pVnode->pQuery, pMsg); default: diff --git a/source/libs/executor/inc/executil.h b/source/libs/executor/inc/executil.h index bcbfeb7015..5d5e0cae88 100644 --- a/source/libs/executor/inc/executil.h +++ b/source/libs/executor/inc/executil.h @@ -156,6 +156,8 @@ SArray* interResFromBinary(const char* data, int32_t len); void freeInterResult(void* param); void initGroupResInfo(SGroupResInfo* pGroupResInfo, SResultRowInfo* pResultInfo); +void initMultiResInfoFromArrayList(SGroupResInfo* pGroupResInfo, SArray* pArrayList); + void cleanupGroupResInfo(SGroupResInfo* pGroupResInfo); bool hasRemainDataInCurrentGroup(SGroupResInfo* pGroupResInfo); bool hasRemainData(SGroupResInfo* pGroupResInfo); diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 1d7023930d..ce0069fdfa 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -422,6 +422,7 @@ typedef struct SStreamBlockScanInfo { uint64_t numOfRows; // total scanned rows uint64_t numOfExec; // execution times void* readerHandle; // stream block reader handle + SArray* pColMatchInfo; // } SStreamBlockScanInfo; typedef struct SSysTableScanInfo { diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index 2c6468a13f..93fbfe43ce 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -222,6 +222,16 @@ void initGroupResInfo(SGroupResInfo* pGroupResInfo, SResultRowInfo* pResultInfo) assert(pGroupResInfo->index <= getNumOfTotalRes(pGroupResInfo)); } +void initMultiResInfoFromArrayList(SGroupResInfo* pGroupResInfo, SArray* pArrayList) { + if (pGroupResInfo->pRows != NULL) { + taosArrayDestroy(pGroupResInfo->pRows); + } + + pGroupResInfo->pRows = pArrayList; + pGroupResInfo->index = 0; + ASSERT(pGroupResInfo->index <= getNumOfTotalRes(pGroupResInfo)); +} + bool hasRemainDataInCurrentGroup(SGroupResInfo* pGroupResInfo) { if (pGroupResInfo->pRows == NULL) { return false; diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 2df18f135c..6265fe6123 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -352,7 +352,7 @@ SSDataBlock* createOutputBuf_rv1(SDataBlockDescNode* pNode) { continue; } - idata.info.type = pDescNode->dataType.type; + idata.info.type = pDescNode->dataType.type; idata.info.bytes = pDescNode->dataType.bytes; idata.info.scale = pDescNode->dataType.scale; idata.info.slotId = pDescNode->slotId; @@ -1551,8 +1551,8 @@ static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe if (pSDataBlock->pDataBlock != NULL) { SColumnInfoData* pColDataInfo = taosArrayGet(pSDataBlock->pDataBlock, 0); tsCols = (int64_t*)pColDataInfo->pData; - assert(tsCols[0] == pSDataBlock->info.window.skey && - tsCols[pSDataBlock->info.rows - 1] == pSDataBlock->info.window.ekey); +// assert(tsCols[0] == pSDataBlock->info.window.skey && +// tsCols[pSDataBlock->info.rows - 1] == pSDataBlock->info.window.ekey); } int32_t startPos = ascScan? 0 : (pSDataBlock->info.rows - 1); @@ -3660,6 +3660,7 @@ void finalizeUpdatedResult(SqlFunctionCtx* pCtx, int32_t numOfOutput, SDiskbased if (pCtx[j].fpSet.process) { // TODO set the dummy function. pCtx[j].fpSet.finalize(&pCtx[j]); + pResInfo->initialized = true; } if (pRow->numOfRows < pResInfo->numOfRes) { @@ -3778,10 +3779,6 @@ void setResultRowOutputBufInitCtx_rv(SDiskbasedBuf* pBuf, SResultRow* pResult, S continue; } - // int32_t functionId = pCtx[i].functionId; - // if (functionId < 0) { - // continue; - // } // if (functionId == FUNCTION_TOP || functionId == FUNCTION_BOTTOM || functionId == FUNCTION_DIFF) { // if (i > 0) pCtx[i].ptsOutputBuf = pCtx[i - 1].pOutput; // } @@ -4971,7 +4968,20 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator, bool* newgroup) return NULL; } - pInfo->pRes->pDataBlock = tqRetrieveDataBlock(pInfo->readerHandle); + SArray* pCols = tqRetrieveDataBlock(pInfo->readerHandle); + + int32_t numOfCols = pInfo->pRes->info.numOfCols; + for (int32_t i = 0; i < numOfCols; ++i) { + SColumnInfoData* p = taosArrayGet(pCols, i); + SColMatchInfo* pColMatchInfo = taosArrayGet(pInfo->pColMatchInfo, i); + if (!pColMatchInfo->output) { + continue; + } + + ASSERT(pColMatchInfo->colId == p->info.colId); + taosArraySet(pInfo->pRes->pDataBlock, pColMatchInfo->targetSlotId, p); + } + if (pInfo->pRes->pDataBlock == NULL) { // TODO add log pTaskInfo->code = terrno; @@ -5625,8 +5635,18 @@ SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, SSDataBlock* return NULL; } + int32_t numOfOutput = taosArrayGetSize(pColList); + + SArray* pColIds = taosArrayInit(4, sizeof(int16_t)); + for(int32_t i = 0; i < numOfOutput; ++i) { + int16_t* id = taosArrayGet(pColList, i); + taosArrayPush(pColIds, id); + } + + pInfo->pColMatchInfo = pColList; + // set the extract column id to streamHandle - tqReadHandleSetColIdList((STqReadHandle*)streamReadHandle, pColList); + tqReadHandleSetColIdList((STqReadHandle*)streamReadHandle, pColIds); int32_t code = tqReadHandleSetTbUidList(streamReadHandle, pTableIdList); if (code != 0) { taosMemoryFreeClear(pInfo); @@ -5665,9 +5685,9 @@ static int32_t loadSysTableContentCb(void* param, const SDataBuf* pMsg, int32_t SRetrieveMetaTableRsp* pRsp = pScanResInfo->pRsp; pRsp->numOfRows = htonl(pRsp->numOfRows); - pRsp->useconds = htobe64(pRsp->useconds); - pRsp->handle = htobe64(pRsp->handle); - pRsp->compLen = htonl(pRsp->compLen); + pRsp->useconds = htobe64(pRsp->useconds); + pRsp->handle = htobe64(pRsp->handle); + pRsp->compLen = htonl(pRsp->compLen); } else { operator->pTaskInfo->code = code; } @@ -5824,6 +5844,10 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator, bool* newgroup) { // pInfo->totalBytes; return (pInfo->pRes->info.rows == 0) ? NULL : pInfo->pRes; } else { // load the meta from mnode of the given epset + if (pOperator->status == OP_EXEC_DONE) { + return NULL; + } + int64_t startTs = taosGetTimestampUs(); pInfo->req.type = pInfo->type; @@ -5863,6 +5887,10 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator, bool* newgroup) { SRetrieveMetaTableRsp* pRsp = pInfo->pRsp; pInfo->req.showId = pRsp->handle; + if (pRsp->numOfRows == 0 || pRsp->completed) { + pOperator->status = OP_EXEC_DONE; + } + if (pRsp->numOfRows == 0) { // qDebug("%s vgId:%d, taskID:0x%"PRIx64" %d of total completed, rowsOfSource:%"PRIu64", totalRows:%"PRIu64" // try next", @@ -6964,7 +6992,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo *pOperator, bool* newgroup if (pInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) { pOperator->status = OP_EXEC_DONE; } - return pInfo->binfo.pRes; + return pInfo->binfo.pRes->info.rows == 0 ? NULL : pInfo->binfo.pRes; } // STimeWindow win = {0}; @@ -6993,6 +7021,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo *pOperator, bool* newgroup finalizeUpdatedResult(pInfo->binfo.pCtx, pOperator->numOfOutput, pInfo->aggSup.pResultBuf, pUpdated, pInfo->binfo.rowCellInfoOffset); + initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdated); blockDataEnsureCapacity(pInfo->binfo.pRes, pInfo->binfo.capacity); toSDatablock(&pInfo->groupResInfo, pInfo->aggSup.pResultBuf, pInfo->binfo.pRes, pInfo->binfo.capacity, pInfo->binfo.rowCellInfoOffset); @@ -8792,10 +8821,10 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo SArray* tableIdList = extractTableIdList(pTableGroupInfo); SSDataBlock* pResBlock = createOutputBuf_rv1(pScanPhyNode->node.pOutputDataBlockDesc); - SArray* colList = extractScanColumnId(pScanPhyNode->pScanCols); - SOperatorInfo* pOperator = - createStreamScanOperatorInfo(pHandle->reader, pResBlock, colList, tableIdList, pTaskInfo); + int32_t numOfCols = 0; + SArray* pColList = extractColMatchInfo(pScanPhyNode->pScanCols, pScanPhyNode->node.pOutputDataBlockDesc, &numOfCols); + SOperatorInfo* pOperator = createStreamScanOperatorInfo(pHandle->reader, pResBlock, pColList, tableIdList, pTaskInfo); taosArrayDestroy(tableIdList); return pOperator; } else if (QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN == nodeType(pPhyNode)) { diff --git a/source/libs/stream/src/tstream.c b/source/libs/stream/src/tstream.c index fb6c0f6c12..5223aa1e3a 100644 --- a/source/libs/stream/src/tstream.c +++ b/source/libs/stream/src/tstream.c @@ -152,6 +152,7 @@ int32_t streamExecTask(SStreamTask* pTask, SMsgCb* pMsgCb, const void* input, in // sink if (pTask->sinkType == TASK_SINK__TABLE) { // + blockDebugShowData(pRes); } else if (pTask->sinkType == TASK_SINK__SMA) { pTask->smaSink.smaHandle(pTask->ahandle, pTask->smaSink.smaId, pRes); //