Merge pull request #18459 from taosdata/feature/stream

enh(stream): new api for stream queue
This commit is contained in:
Shengliang Guan 2022-11-26 09:26:01 +08:00 committed by GitHub
commit ccc2407fe6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 135 additions and 48 deletions

View File

@ -72,7 +72,7 @@ SHOW STREAMS;
若要展示更详细的信息,可以使用: 若要展示更详细的信息,可以使用:
```sql ```sql
SELECT * from performance_schema.`perf_streams`; SELECT * from information_schema.`ins_streams`;
``` ```
## 流式计算的触发模式 ## 流式计算的触发模式

View File

@ -140,15 +140,40 @@ typedef struct {
int8_t type; int8_t type;
} SStreamCheckpoint; } SStreamCheckpoint;
typedef struct {
int8_t type;
} SStreamTaskDestroy;
typedef struct { typedef struct {
int8_t type; int8_t type;
SSDataBlock* pBlock; SSDataBlock* pBlock;
} SStreamTrigger; } SStreamTrigger;
typedef struct SStreamQueueNode SStreamQueueNode;
struct SStreamQueueNode {
SStreamQueueItem* item;
SStreamQueueNode* next;
};
typedef struct {
SStreamQueueNode* head;
int64_t size;
} SStreamQueueRes;
void streamFreeQitem(SStreamQueueItem* data);
bool streamQueueResEmpty(const SStreamQueueRes* pRes);
int64_t streamQueueResSize(const SStreamQueueRes* pRes);
SStreamQueueNode* streamQueueResFront(SStreamQueueRes* pRes);
SStreamQueueNode* streamQueueResPop(SStreamQueueRes* pRes);
void streamQueueResClear(SStreamQueueRes* pRes);
SStreamQueueRes streamQueueBuildRes(SStreamQueueNode* pNode);
typedef struct {
SStreamQueueNode* pHead;
} SStreamQueue1;
bool streamQueueHasTask(const SStreamQueue1* pQueue);
int32_t streamQueuePush(SStreamQueue1* pQueue, SStreamQueueItem* pItem);
SStreamQueueRes streamQueueGetRes(SStreamQueue1* pQueue);
typedef struct { typedef struct {
STaosQueue* queue; STaosQueue* queue;
STaosQall* qall; STaosQall* qall;

View File

@ -457,6 +457,8 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) {
if (topicEp.vgs == NULL) { if (topicEp.vgs == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
taosRUnLockLatch(&pConsumer->lock); taosRUnLockLatch(&pConsumer->lock);
taosRUnLockLatch(&pSub->lock);
mndReleaseSubscribe(pMnode, pSub);
goto FAIL; goto FAIL;
} }

View File

@ -317,9 +317,9 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
bool externalTargetDB = strcmp(pStream->sourceDb, pStream->targetDb) != 0; bool externalTargetDB = strcmp(pStream->sourceDb, pStream->targetDb) != 0;
SDbObj* pDbObj = mndAcquireDb(pMnode, pStream->targetDb); SDbObj* pDbObj = mndAcquireDb(pMnode, pStream->targetDb);
ASSERT(pDbObj != NULL); ASSERT(pDbObj != NULL);
sdbRelease(pSdb, pDbObj);
bool multiTarget = pDbObj->cfg.numOfVgroups > 1; bool multiTarget = pDbObj->cfg.numOfVgroups > 1;
sdbRelease(pSdb, pDbObj);
if (planTotLevel == 2 || externalTargetDB || multiTarget || pStream->fixedSinkVgId) { if (planTotLevel == 2 || externalTargetDB || multiTarget || pStream->fixedSinkVgId) {
/*if (true) {*/ /*if (true) {*/
@ -451,7 +451,6 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
SStreamChildEpInfo* pEpInfo = taosMemoryMalloc(sizeof(SStreamChildEpInfo)); SStreamChildEpInfo* pEpInfo = taosMemoryMalloc(sizeof(SStreamChildEpInfo));
if (pEpInfo == NULL) { if (pEpInfo == NULL) {
ASSERT(0);
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
sdbRelease(pSdb, pVgroup); sdbRelease(pSdb, pVgroup);
qDestroyQueryPlan(pPlan); qDestroyQueryPlan(pPlan);

View File

@ -379,7 +379,6 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t version, char* msg, int32_t m
STqHandle* pHandle = taosHashGet(pTq->pHandle, offset.subKey, strlen(offset.subKey)); STqHandle* pHandle = taosHashGet(pTq->pHandle, offset.subKey, strlen(offset.subKey));
if (pHandle) { if (pHandle) {
if (walRefVer(pHandle->pRef, offset.val.version) < 0) { if (walRefVer(pHandle->pRef, offset.val.version) < 0) {
ASSERT(0);
return -1; return -1;
} }
} }

View File

@ -915,6 +915,11 @@ static SSDataBlock* buildStreamPartitionResult(SOperatorInfo* pOperator) {
} }
pDest->info.rows++; pDest->info.rows++;
if (pInfo->tbnameCalSup.numOfExprs > 0 && i == 0) { if (pInfo->tbnameCalSup.numOfExprs > 0 && i == 0) {
void* tbname = NULL;
if (streamStateGetParName(pOperator->pTaskInfo->streamInfo.pState, pParInfo->groupId, &tbname) == 0) {
memcpy(pDest->info.parTbName, tbname, TSDB_TABLE_NAME_LEN);
tdbFree(tbname);
} else {
SSDataBlock* pTmpBlock = blockCopyOneRow(pSrc, rowIndex); SSDataBlock* pTmpBlock = blockCopyOneRow(pSrc, rowIndex);
SSDataBlock* pResBlock = createDataBlock(); SSDataBlock* pResBlock = createDataBlock();
pResBlock->info.rowSize = TSDB_TABLE_NAME_LEN; pResBlock->info.rowSize = TSDB_TABLE_NAME_LEN;
@ -944,6 +949,7 @@ static SSDataBlock* buildStreamPartitionResult(SOperatorInfo* pOperator) {
blockDataDestroy(pResBlock); blockDataDestroy(pResBlock);
} }
} }
}
taosArrayDestroy(pParInfo->rowIds); taosArrayDestroy(pParInfo->rowIds);
pParInfo->rowIds = NULL; pParInfo->rowIds = NULL;
blockDataUpdateTsWindow(pDest, pInfo->tsColIndex); blockDataUpdateTsWindow(pDest, pInfo->tsColIndex);

View File

@ -163,8 +163,8 @@ static SResultRow* getTableGroupOutputBuf(SOperatorInfo* pOperator, uint64_t gro
STableScanInfo* pTableScanInfo = pOperator->info; STableScanInfo* pTableScanInfo = pOperator->info;
SResultRowPosition* p1 = (SResultRowPosition*)tSimpleHashGet(pTableScanInfo->base.pdInfo.pAggSup->pResultRowHashTable, buf, SResultRowPosition* p1 = (SResultRowPosition*)tSimpleHashGet(pTableScanInfo->base.pdInfo.pAggSup->pResultRowHashTable,
GET_RES_WINDOW_KEY_LEN(sizeof(groupId))); buf, GET_RES_WINDOW_KEY_LEN(sizeof(groupId)));
if (p1 == NULL) { if (p1 == NULL) {
return NULL; return NULL;
@ -1312,6 +1312,7 @@ static int32_t generateDeleteResultBlock(SStreamScanInfo* pInfo, SSDataBlock* pS
memcpy(varDataVal(tbname), parTbname, TSDB_TABLE_NAME_LEN); memcpy(varDataVal(tbname), parTbname, TSDB_TABLE_NAME_LEN);
varDataSetLen(tbname, strlen(varDataVal(tbname))); varDataSetLen(tbname, strlen(varDataVal(tbname)));
tdbFree(parTbname);
} }
appendOneRowToStreamSpecialBlock(pDestBlock, srcStartTsCol + i, srcEndTsCol + i, srcUidData + i, &groupId, appendOneRowToStreamSpecialBlock(pDestBlock, srcStartTsCol + i, srcEndTsCol + i, srcUidData + i, &groupId,
tbname[0] == 0 ? NULL : tbname); tbname[0] == 0 ? NULL : tbname);
@ -1932,6 +1933,7 @@ FETCH_NEXT_BLOCK:
if (pInfo->validBlockIndex >= totBlockNum) { if (pInfo->validBlockIndex >= totBlockNum) {
updateInfoDestoryColseWinSBF(pInfo->pUpdateInfo); updateInfoDestoryColseWinSBF(pInfo->pUpdateInfo);
doClearBufferedBlocks(pInfo); doClearBufferedBlocks(pInfo);
qDebug("stream scan return empty, consume block %d", totBlockNum);
return NULL; return NULL;
} }
@ -2566,7 +2568,7 @@ static SSDataBlock* getTableDataBlockImpl(void* param) {
uint32_t status = 0; uint32_t status = 0;
loadDataBlock(pOperator, &pTableScanInfo->base, pBlock, &status); loadDataBlock(pOperator, &pTableScanInfo->base, pBlock, &status);
// code = loadDataBlockFromOneTable(pOperator, pTableScanInfo, pBlock, &status); // code = loadDataBlockFromOneTable(pOperator, pTableScanInfo, pBlock, &status);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, code); T_LONG_JMP(pTaskInfo->env, code);
} }
@ -2897,7 +2899,6 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
goto _error; goto _error;
} }
initResultSizeInfo(&pOperator->resultInfo, 1024); initResultSizeInfo(&pOperator->resultInfo, 1024);
pInfo->pResBlock = createResDataBlock(pDescNode); pInfo->pResBlock = createResDataBlock(pDescNode);
blockDataEnsureCapacity(pInfo->pResBlock, pOperator->resultInfo.capacity); blockDataEnsureCapacity(pInfo->pResBlock, pOperator->resultInfo.capacity);

View File

@ -47,7 +47,6 @@ int32_t streamDispatchOneRecoverFinishReq(SStreamTask* pTask, const SStreamRecov
SEpSet* pEpSet); SEpSet* pEpSet);
SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* elem); SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* elem);
void streamFreeQitem(SStreamQueueItem* data);
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -45,3 +45,59 @@ void streamQueueClose(SStreamQueue* queue) {
taosCloseQueue(queue->queue); taosCloseQueue(queue->queue);
taosMemoryFree(queue); taosMemoryFree(queue);
} }
bool streamQueueResEmpty(const SStreamQueueRes* pRes) {
//
return true;
}
int64_t streamQueueResSize(const SStreamQueueRes* pRes) { return pRes->size; }
SStreamQueueNode* streamQueueResFront(SStreamQueueRes* pRes) { return pRes->head; }
SStreamQueueNode* streamQueueResPop(SStreamQueueRes* pRes) {
SStreamQueueNode* pRet = pRes->head;
pRes->head = pRes->head->next;
return pRet;
}
void streamQueueResClear(SStreamQueueRes* pRes) {
while (pRes->head) {
SStreamQueueNode* pNode = pRes->head;
streamFreeQitem(pRes->head->item);
pRes->head = pNode;
}
}
SStreamQueueRes streamQueueBuildRes(SStreamQueueNode* pTail) {
int64_t size = 0;
SStreamQueueNode* head = NULL;
while (pTail) {
SStreamQueueNode* pTmp = pTail->next;
pTail->next = head;
head = pTail;
pTail = pTmp;
size++;
}
return (SStreamQueueRes){.head = head, .size = size};
}
bool streamQueueHasTask(const SStreamQueue1* pQueue) { return atomic_load_ptr(pQueue->pHead); }
int32_t streamQueuePush(SStreamQueue1* pQueue, SStreamQueueItem* pItem) {
SStreamQueueNode* pNode = taosMemoryMalloc(sizeof(SStreamQueueNode));
pNode->item = pItem;
SStreamQueueNode* pHead = atomic_load_ptr(pQueue->pHead);
while (1) {
pNode->next = pHead;
SStreamQueueNode* pOld = atomic_val_compare_exchange_ptr(pQueue->pHead, pHead, pNode);
if (pOld == pHead) {
break;
}
}
return 0;
}
SStreamQueueRes streamQueueGetRes(SStreamQueue1* pQueue) {
SStreamQueueNode* pNode = atomic_exchange_ptr(pQueue->pHead, NULL);
if (pNode) return streamQueueBuildRes(pNode);
return (SStreamQueueRes){0};
}

View File

@ -195,7 +195,7 @@ sql select * from information_schema.ins_stables
if $rows != 1 then if $rows != 1 then
return -1 return -1
endi endi
#sql select * from performance_schema.perf_streams #sql select * frominformation_schema.ins_streams
sql select * from information_schema.ins_tables sql select * from information_schema.ins_tables
if $rows <= 0 then if $rows <= 0 then
return -1 return -1