diff --git a/docs/zh/12-taos-sql/14-stream.md b/docs/zh/12-taos-sql/14-stream.md index cd726e0a0e..932ad30b1a 100644 --- a/docs/zh/12-taos-sql/14-stream.md +++ b/docs/zh/12-taos-sql/14-stream.md @@ -72,7 +72,7 @@ SHOW STREAMS; 若要展示更详细的信息,可以使用: ```sql -SELECT * from performance_schema.`perf_streams`; +SELECT * from information_schema.`ins_streams`; ``` ## 流式计算的触发模式 diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index ecd1b6f916..4099551188 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -140,15 +140,40 @@ typedef struct { int8_t type; } SStreamCheckpoint; -typedef struct { - int8_t type; -} SStreamTaskDestroy; - typedef struct { int8_t type; SSDataBlock* pBlock; } SStreamTrigger; +typedef struct SStreamQueueNode SStreamQueueNode; + +struct SStreamQueueNode { + SStreamQueueItem* item; + SStreamQueueNode* next; +}; + +typedef struct { + SStreamQueueNode* head; + int64_t size; +} SStreamQueueRes; + +void streamFreeQitem(SStreamQueueItem* data); + +bool streamQueueResEmpty(const SStreamQueueRes* pRes); +int64_t streamQueueResSize(const SStreamQueueRes* pRes); +SStreamQueueNode* streamQueueResFront(SStreamQueueRes* pRes); +SStreamQueueNode* streamQueueResPop(SStreamQueueRes* pRes); +void streamQueueResClear(SStreamQueueRes* pRes); +SStreamQueueRes streamQueueBuildRes(SStreamQueueNode* pNode); + +typedef struct { + SStreamQueueNode* pHead; +} SStreamQueue1; + +bool streamQueueHasTask(const SStreamQueue1* pQueue); +int32_t streamQueuePush(SStreamQueue1* pQueue, SStreamQueueItem* pItem); +SStreamQueueRes streamQueueGetRes(SStreamQueue1* pQueue); + typedef struct { STaosQueue* queue; STaosQall* qall; diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 300251d64d..58f8172282 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -324,15 +324,15 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) { } static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) { - SMnode *pMnode = pMsg->info.node; - SMqHbReq req = {0}; + SMnode *pMnode = pMsg->info.node; + SMqHbReq req = {0}; if (tDeserializeSMqHbReq(pMsg->pCont, pMsg->contLen, &req) < 0) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } - int64_t consumerId = req.consumerId; + int64_t consumerId = req.consumerId; SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, consumerId); if (pConsumer == NULL) { mError("consumer %" PRId64 " not exist", consumerId); @@ -363,17 +363,17 @@ static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) { } static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) { - SMnode *pMnode = pMsg->info.node; - SMqAskEpReq req = {0}; - SMqAskEpRsp rsp = {0}; + SMnode *pMnode = pMsg->info.node; + SMqAskEpReq req = {0}; + SMqAskEpRsp rsp = {0}; if (tDeserializeSMqAskEpReq(pMsg->pCont, pMsg->contLen, &req) < 0) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } - int64_t consumerId = req.consumerId; - int32_t epoch = req.epoch; + int64_t consumerId = req.consumerId; + int32_t epoch = req.epoch; SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, consumerId); if (pConsumer == NULL) { @@ -457,6 +457,8 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) { if (topicEp.vgs == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; taosRUnLockLatch(&pConsumer->lock); + taosRUnLockLatch(&pSub->lock); + mndReleaseSubscribe(pMnode, pSub); goto FAIL; } diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index e8428ea470..3c1d3f09bf 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -317,9 +317,9 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) { bool externalTargetDB = strcmp(pStream->sourceDb, pStream->targetDb) != 0; SDbObj* pDbObj = mndAcquireDb(pMnode, pStream->targetDb); ASSERT(pDbObj != NULL); - sdbRelease(pSdb, pDbObj); bool multiTarget = pDbObj->cfg.numOfVgroups > 1; + sdbRelease(pSdb, pDbObj); if (planTotLevel == 2 || externalTargetDB || multiTarget || pStream->fixedSinkVgId) { /*if (true) {*/ @@ -451,7 +451,6 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) { SStreamChildEpInfo* pEpInfo = taosMemoryMalloc(sizeof(SStreamChildEpInfo)); if (pEpInfo == NULL) { - ASSERT(0); terrno = TSDB_CODE_OUT_OF_MEMORY; sdbRelease(pSdb, pVgroup); qDestroyQueryPlan(pPlan); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 6c1c552ccb..61f027039d 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -379,7 +379,6 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t version, char* msg, int32_t m STqHandle* pHandle = taosHashGet(pTq->pHandle, offset.subKey, strlen(offset.subKey)); if (pHandle) { if (walRefVer(pHandle->pRef, offset.val.version) < 0) { - ASSERT(0); return -1; } } diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index cde8346487..82398d6e34 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -915,33 +915,39 @@ static SSDataBlock* buildStreamPartitionResult(SOperatorInfo* pOperator) { } pDest->info.rows++; if (pInfo->tbnameCalSup.numOfExprs > 0 && i == 0) { - SSDataBlock* pTmpBlock = blockCopyOneRow(pSrc, rowIndex); - SSDataBlock* pResBlock = createDataBlock(); - pResBlock->info.rowSize = TSDB_TABLE_NAME_LEN; - SColumnInfoData data = createColumnInfoData(TSDB_DATA_TYPE_VARCHAR, TSDB_TABLE_NAME_LEN, 0); - taosArrayPush(pResBlock->pDataBlock, &data); - blockDataEnsureCapacity(pResBlock, 1); - projectApplyFunctions(pInfo->tbnameCalSup.pExprInfo, pResBlock, pTmpBlock, pInfo->tbnameCalSup.pCtx, 1, NULL); - ASSERT(pResBlock->info.rows == 1); - ASSERT(taosArrayGetSize(pResBlock->pDataBlock) == 1); - SColumnInfoData* pCol = taosArrayGet(pResBlock->pDataBlock, 0); - ASSERT(pCol->info.type == TSDB_DATA_TYPE_VARCHAR); - void* pData = colDataGetVarData(pCol, 0); - // TODO check tbname validity - if (pData != (void*)-1) { - memset(pDest->info.parTbName, 0, TSDB_TABLE_NAME_LEN); - int32_t len = TMIN(varDataLen(pData), TSDB_TABLE_NAME_LEN - 1); - memcpy(pDest->info.parTbName, varDataVal(pData), len); - /*pDest->info.parTbName[len + 1] = 0;*/ + void* tbname = NULL; + if (streamStateGetParName(pOperator->pTaskInfo->streamInfo.pState, pParInfo->groupId, &tbname) == 0) { + memcpy(pDest->info.parTbName, tbname, TSDB_TABLE_NAME_LEN); + tdbFree(tbname); } else { - pDest->info.parTbName[0] = 0; + SSDataBlock* pTmpBlock = blockCopyOneRow(pSrc, rowIndex); + SSDataBlock* pResBlock = createDataBlock(); + pResBlock->info.rowSize = TSDB_TABLE_NAME_LEN; + SColumnInfoData data = createColumnInfoData(TSDB_DATA_TYPE_VARCHAR, TSDB_TABLE_NAME_LEN, 0); + taosArrayPush(pResBlock->pDataBlock, &data); + blockDataEnsureCapacity(pResBlock, 1); + projectApplyFunctions(pInfo->tbnameCalSup.pExprInfo, pResBlock, pTmpBlock, pInfo->tbnameCalSup.pCtx, 1, NULL); + ASSERT(pResBlock->info.rows == 1); + ASSERT(taosArrayGetSize(pResBlock->pDataBlock) == 1); + SColumnInfoData* pCol = taosArrayGet(pResBlock->pDataBlock, 0); + ASSERT(pCol->info.type == TSDB_DATA_TYPE_VARCHAR); + void* pData = colDataGetVarData(pCol, 0); + // TODO check tbname validity + if (pData != (void*)-1) { + memset(pDest->info.parTbName, 0, TSDB_TABLE_NAME_LEN); + int32_t len = TMIN(varDataLen(pData), TSDB_TABLE_NAME_LEN - 1); + memcpy(pDest->info.parTbName, varDataVal(pData), len); + /*pDest->info.parTbName[len + 1] = 0;*/ + } else { + pDest->info.parTbName[0] = 0; + } + if (pParInfo->groupId && pDest->info.parTbName[0]) { + streamStatePutParName(pOperator->pTaskInfo->streamInfo.pState, pParInfo->groupId, pDest->info.parTbName); + } + /*printf("\n\n set name %s\n\n", pDest->info.parTbName);*/ + blockDataDestroy(pTmpBlock); + blockDataDestroy(pResBlock); } - if (pParInfo->groupId && pDest->info.parTbName[0]) { - streamStatePutParName(pOperator->pTaskInfo->streamInfo.pState, pParInfo->groupId, pDest->info.parTbName); - } - /*printf("\n\n set name %s\n\n", pDest->info.parTbName);*/ - blockDataDestroy(pTmpBlock); - blockDataDestroy(pResBlock); } } taosArrayDestroy(pParInfo->rowIds); diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 43d09dcac6..4c23d17ac7 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -163,8 +163,8 @@ static SResultRow* getTableGroupOutputBuf(SOperatorInfo* pOperator, uint64_t gro STableScanInfo* pTableScanInfo = pOperator->info; - SResultRowPosition* p1 = (SResultRowPosition*)tSimpleHashGet(pTableScanInfo->base.pdInfo.pAggSup->pResultRowHashTable, buf, - GET_RES_WINDOW_KEY_LEN(sizeof(groupId))); + SResultRowPosition* p1 = (SResultRowPosition*)tSimpleHashGet(pTableScanInfo->base.pdInfo.pAggSup->pResultRowHashTable, + buf, GET_RES_WINDOW_KEY_LEN(sizeof(groupId))); if (p1 == NULL) { return NULL; @@ -306,7 +306,7 @@ void applyLimitOffset(SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SExecTaskInfo static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanBase* pTableScanInfo, SSDataBlock* pBlock, uint32_t* status) { - SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SFileBlockLoadRecorder* pCost = &pTableScanInfo->readRecorder; pCost->totalBlocks += 1; @@ -1312,6 +1312,7 @@ static int32_t generateDeleteResultBlock(SStreamScanInfo* pInfo, SSDataBlock* pS memcpy(varDataVal(tbname), parTbname, TSDB_TABLE_NAME_LEN); varDataSetLen(tbname, strlen(varDataVal(tbname))); + tdbFree(parTbname); } appendOneRowToStreamSpecialBlock(pDestBlock, srcStartTsCol + i, srcEndTsCol + i, srcUidData + i, &groupId, tbname[0] == 0 ? NULL : tbname); @@ -1932,6 +1933,7 @@ FETCH_NEXT_BLOCK: if (pInfo->validBlockIndex >= totBlockNum) { updateInfoDestoryColseWinSBF(pInfo->pUpdateInfo); doClearBufferedBlocks(pInfo); + qDebug("stream scan return empty, consume block %d", totBlockNum); return NULL; } @@ -2566,7 +2568,7 @@ static SSDataBlock* getTableDataBlockImpl(void* param) { uint32_t status = 0; loadDataBlock(pOperator, &pTableScanInfo->base, pBlock, &status); -// code = loadDataBlockFromOneTable(pOperator, pTableScanInfo, pBlock, &status); + // code = loadDataBlockFromOneTable(pOperator, pTableScanInfo, pBlock, &status); if (code != TSDB_CODE_SUCCESS) { T_LONG_JMP(pTaskInfo->env, code); } @@ -2897,7 +2899,6 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN goto _error; } - initResultSizeInfo(&pOperator->resultInfo, 1024); pInfo->pResBlock = createResDataBlock(pDescNode); blockDataEnsureCapacity(pInfo->pResBlock, pOperator->resultInfo.capacity); diff --git a/source/libs/stream/inc/streamInc.h b/source/libs/stream/inc/streamInc.h index 0fc75c4798..5ff49502df 100644 --- a/source/libs/stream/inc/streamInc.h +++ b/source/libs/stream/inc/streamInc.h @@ -47,7 +47,6 @@ int32_t streamDispatchOneRecoverFinishReq(SStreamTask* pTask, const SStreamRecov SEpSet* pEpSet); SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* elem); -void streamFreeQitem(SStreamQueueItem* data); #ifdef __cplusplus } diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c index ac10c82587..7eafcdc93e 100644 --- a/source/libs/stream/src/streamQueue.c +++ b/source/libs/stream/src/streamQueue.c @@ -45,3 +45,59 @@ void streamQueueClose(SStreamQueue* queue) { taosCloseQueue(queue->queue); taosMemoryFree(queue); } + +bool streamQueueResEmpty(const SStreamQueueRes* pRes) { + // + return true; +} +int64_t streamQueueResSize(const SStreamQueueRes* pRes) { return pRes->size; } +SStreamQueueNode* streamQueueResFront(SStreamQueueRes* pRes) { return pRes->head; } +SStreamQueueNode* streamQueueResPop(SStreamQueueRes* pRes) { + SStreamQueueNode* pRet = pRes->head; + pRes->head = pRes->head->next; + return pRet; +} + +void streamQueueResClear(SStreamQueueRes* pRes) { + while (pRes->head) { + SStreamQueueNode* pNode = pRes->head; + streamFreeQitem(pRes->head->item); + pRes->head = pNode; + } +} + +SStreamQueueRes streamQueueBuildRes(SStreamQueueNode* pTail) { + int64_t size = 0; + SStreamQueueNode* head = NULL; + + while (pTail) { + SStreamQueueNode* pTmp = pTail->next; + pTail->next = head; + head = pTail; + pTail = pTmp; + size++; + } + + return (SStreamQueueRes){.head = head, .size = size}; +} + +bool streamQueueHasTask(const SStreamQueue1* pQueue) { return atomic_load_ptr(pQueue->pHead); } +int32_t streamQueuePush(SStreamQueue1* pQueue, SStreamQueueItem* pItem) { + SStreamQueueNode* pNode = taosMemoryMalloc(sizeof(SStreamQueueNode)); + pNode->item = pItem; + SStreamQueueNode* pHead = atomic_load_ptr(pQueue->pHead); + while (1) { + pNode->next = pHead; + SStreamQueueNode* pOld = atomic_val_compare_exchange_ptr(pQueue->pHead, pHead, pNode); + if (pOld == pHead) { + break; + } + } + return 0; +} + +SStreamQueueRes streamQueueGetRes(SStreamQueue1* pQueue) { + SStreamQueueNode* pNode = atomic_exchange_ptr(pQueue->pHead, NULL); + if (pNode) return streamQueueBuildRes(pNode); + return (SStreamQueueRes){0}; +} diff --git a/tests/script/tsim/show/basic.sim b/tests/script/tsim/show/basic.sim index 274476e17c..cae7a66589 100644 --- a/tests/script/tsim/show/basic.sim +++ b/tests/script/tsim/show/basic.sim @@ -195,7 +195,7 @@ sql select * from information_schema.ins_stables if $rows != 1 then return -1 endi -#sql select * from performance_schema.perf_streams +#sql select * frominformation_schema.ins_streams sql select * from information_schema.ins_tables if $rows <= 0 then return -1