diff --git a/include/common/tcommon.h b/include/common/tcommon.h index 930b7be3ef..0b647934ff 100644 --- a/include/common/tcommon.h +++ b/include/common/tcommon.h @@ -54,12 +54,12 @@ enum { enum { STREAM_INPUT__DATA_SUBMIT = 1, STREAM_INPUT__DATA_BLOCK, + STREAM_INPUT__MERGED_SUBMIT, // STREAM_INPUT__TABLE_SCAN, STREAM_INPUT__TQ_SCAN, STREAM_INPUT__DATA_RETRIEVE, STREAM_INPUT__GET_RES, STREAM_INPUT__CHECKPOINT, - STREAM_INPUT__DROP, }; typedef enum EStreamType { diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index a484492d08..f6c3b3f5b2 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -77,6 +77,13 @@ typedef struct { SSubmitReq* data; } SStreamDataSubmit; +typedef struct { + int8_t type; + int64_t ver; + SArray* dataRefs; // SArray + SArray* reqs; // SArray +} SStreamMergedSubmit; + typedef struct { int8_t type; diff --git a/include/util/tarray.h b/include/util/tarray.h index 482f13de39..7c1bc34d71 100644 --- a/include/util/tarray.h +++ b/include/util/tarray.h @@ -87,6 +87,14 @@ void taosArrayRemoveBatch(SArray* pArray, const int32_t* pData, int32_t numOfEle */ void taosArrayRemoveDuplicate(SArray* pArray, __compar_fn_t comparFn, void (*fp)(void*)); +/** + * + * @param pArray + * @param comparFn + * @param fp + */ +void taosArrayRemoveDuplicateP(SArray* pArray, __compar_fn_t comparFn, void (*fp)(void*)); + /** * add all element from the source array list into the destination * @param pArray diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index a60db8a8c2..315b7c3afc 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -452,7 +452,7 @@ static int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { int32_t code = -1; SArray *newSub = subscribe.topicNames; taosArraySortString(newSub, taosArrayCompareString); - taosArrayRemoveDuplicate(newSub, taosArrayCompareString, taosMemoryFree); + taosArrayRemoveDuplicateP(newSub, taosArrayCompareString, taosMemoryFree); int32_t newTopicNum = taosArrayGetSize(newSub); // check topic existance diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 2893be2adc..fd0f97a638 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -91,7 +91,7 @@ int metaBegin(SMeta* pMeta); int metaCommit(SMeta* pMeta); int metaCreateSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* pReq); int metaAlterSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* pReq); -int metaDropSTable(SMeta* pMeta, int64_t verison, SVDropStbReq* pReq); +int metaDropSTable(SMeta* pMeta, int64_t verison, SVDropStbReq* pReq, SArray* tbUidList); int metaCreateTable(SMeta* pMeta, int64_t version, SVCreateTbReq* pReq); int metaDropTable(SMeta* pMeta, int64_t version, SVDropTbReq* pReq, SArray* tbUids); int metaTtlDropTable(SMeta* pMeta, int64_t ttl, SArray* tbUids); diff --git a/source/dnode/vnode/src/meta/metaTable.c b/source/dnode/vnode/src/meta/metaTable.c index e799ef4adc..5255724b58 100644 --- a/source/dnode/vnode/src/meta/metaTable.c +++ b/source/dnode/vnode/src/meta/metaTable.c @@ -212,7 +212,7 @@ _err: return -1; } -int metaDropSTable(SMeta *pMeta, int64_t verison, SVDropStbReq *pReq) { +int metaDropSTable(SMeta *pMeta, int64_t verison, SVDropStbReq *pReq, SArray *tbUidList) { void *pKey = NULL; int nKey = 0; void *pData = NULL; @@ -228,8 +228,7 @@ int metaDropSTable(SMeta *pMeta, int64_t verison, SVDropStbReq *pReq) { } // drop all child tables - TBC *pCtbIdxc = NULL; - SArray *pArray = taosArrayInit(8, sizeof(tb_uid_t)); + TBC *pCtbIdxc = NULL; tdbTbcOpen(pMeta->pCtbIdx, &pCtbIdxc, &pMeta->txn); rc = tdbTbcMoveTo(pCtbIdxc, &(SCtbIdxKey){.suid = pReq->suid, .uid = INT64_MIN}, sizeof(SCtbIdxKey), &c); @@ -249,20 +248,18 @@ int metaDropSTable(SMeta *pMeta, int64_t verison, SVDropStbReq *pReq) { break; } - taosArrayPush(pArray, &(((SCtbIdxKey *)pKey)->uid)); + taosArrayPush(tbUidList, &(((SCtbIdxKey *)pKey)->uid)); } tdbTbcClose(pCtbIdxc); metaWLock(pMeta); - for (int32_t iChild = 0; iChild < taosArrayGetSize(pArray); iChild++) { - tb_uid_t uid = *(tb_uid_t *)taosArrayGet(pArray, iChild); + for (int32_t iChild = 0; iChild < taosArrayGetSize(tbUidList); iChild++) { + tb_uid_t uid = *(tb_uid_t *)taosArrayGet(tbUidList, iChild); metaDropTableByUid(pMeta, uid, NULL); } - taosArrayDestroy(pArray); - // drop super table _drop_super_table: tdbTbGet(pMeta->pUidIdx, &pReq->suid, sizeof(tb_uid_t), &pData, &nData); diff --git a/source/dnode/vnode/src/tq/tqExec.c b/source/dnode/vnode/src/tq/tqExec.c index 3a6afb22ac..5172819d2a 100644 --- a/source/dnode/vnode/src/tq/tqExec.c +++ b/source/dnode/vnode/src/tq/tqExec.c @@ -49,8 +49,8 @@ static int32_t tqAddBlockSchemaToRsp(const STqExecHandle* pExec, int32_t workerI static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, SMqDataRsp* pRsp) { SMetaReader mr = {0}; metaReaderInit(&mr, pTq->pVnode->pMeta, 0); + // TODO add reference to gurantee success if (metaGetTableEntryByUid(&mr, uid) < 0) { - ASSERT(0); return -1; } char* tbName = strdup(mr.me.name); @@ -87,16 +87,18 @@ int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVa tqDebug("task execute end, get %p", pDataBlock); if (pDataBlock != NULL) { - tqAddBlockDataToRsp(pDataBlock, pRsp, pExec->numOfCols); - pRsp->blockNum++; if (pRsp->withTbName) { if (pOffset->type == TMQ_OFFSET__LOG) { int64_t uid = pExec->pExecReader[0]->msgIter.uid; - tqAddTbNameToRsp(pTq, uid, pRsp); + if (tqAddTbNameToRsp(pTq, uid, pRsp) < 0) { + continue; + } } else { pRsp->withTbName = 0; } } + tqAddBlockDataToRsp(pDataBlock, pRsp, pExec->numOfCols); + pRsp->blockNum++; if (pOffset->type == TMQ_OFFSET__LOG) { continue; } else { @@ -193,13 +195,14 @@ int32_t tqLogScanExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataR SSDataBlock block = {0}; if (tqRetrieveDataBlock(&block, pReader) < 0) { if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue; - ASSERT(0); } - tqAddBlockDataToRsp(&block, pRsp, taosArrayGetSize(block.pDataBlock)); if (pRsp->withTbName) { int64_t uid = pExec->pExecReader[workerId]->msgIter.uid; - tqAddTbNameToRsp(pTq, uid, pRsp); + if (tqAddTbNameToRsp(pTq, uid, pRsp) < 0) { + continue; + } } + tqAddBlockDataToRsp(&block, pRsp, taosArrayGetSize(block.pDataBlock)); tqAddBlockSchemaToRsp(pExec, workerId, pRsp); pRsp->blockNum++; } @@ -211,13 +214,14 @@ int32_t tqLogScanExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataR SSDataBlock block = {0}; if (tqRetrieveDataBlock(&block, pReader) < 0) { if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue; - ASSERT(0); } - tqAddBlockDataToRsp(&block, pRsp, taosArrayGetSize(block.pDataBlock)); if (pRsp->withTbName) { int64_t uid = pExec->pExecReader[workerId]->msgIter.uid; - tqAddTbNameToRsp(pTq, uid, pRsp); + if (tqAddTbNameToRsp(pTq, uid, pRsp) < 0) { + continue; + } } + tqAddBlockDataToRsp(&block, pRsp, taosArrayGetSize(block.pDataBlock)); tqAddBlockSchemaToRsp(pExec, workerId, pRsp); pRsp->blockNum++; } diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 47b497b480..e4c11c4787 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -299,6 +299,7 @@ int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, STqReader* pReader) { } if (blockDataEnsureCapacity(pBlock, pReader->msgIter.numOfRows) < 0) { + terrno = TSDB_CODE_OUT_OF_MEMORY; goto FAIL; } diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index 292bc66265..b0eb7f4a14 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -178,6 +178,8 @@ void tqTableSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data) { const SArray* pRes = (const SArray*)data; SVnode* pVnode = (SVnode*)vnode; + tqDebug("task write into table, vgId %d, block num: %d", pVnode->config.vgId, (int32_t)pRes->size); + ASSERT(pTask->tbSink.pTSchema); SSubmitReq* pReq = tdBlockToSubmit(pRes, pTask->tbSink.pTSchema, true, pTask->tbSink.stbUid, pTask->tbSink.stbFullName, pVnode->config.vgId); diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 5f730bcfa5..8e59d97286 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -557,6 +557,7 @@ static int32_t vnodeProcessDropStbReq(SVnode *pVnode, int64_t version, void *pRe SVDropStbReq req = {0}; int32_t rcode = TSDB_CODE_SUCCESS; SDecoder decoder = {0}; + SArray *tbUidList = NULL; pRsp->msgType = TDMT_VND_CREATE_STB_RSP; pRsp->pCont = NULL; @@ -570,7 +571,14 @@ static int32_t vnodeProcessDropStbReq(SVnode *pVnode, int64_t version, void *pRe } // process request - if (metaDropSTable(pVnode->pMeta, version, &req) < 0) { + tbUidList = taosArrayInit(8, sizeof(int64_t)); + if (tbUidList == NULL) goto _exit; + if (metaDropSTable(pVnode->pMeta, version, &req, tbUidList) < 0) { + rcode = terrno; + goto _exit; + } + + if (tqUpdateTbUidList(pVnode->pTq, tbUidList, false) < 0) { rcode = terrno; goto _exit; } @@ -582,6 +590,7 @@ static int32_t vnodeProcessDropStbReq(SVnode *pVnode, int64_t version, void *pRe // return rsp _exit: + if (tbUidList) taosArrayDestroy(tbUidList); pRsp->code = rcode; tDecoderClear(&decoder); return 0; diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 0bf294f33d..9491c675c1 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -42,17 +42,32 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu // TODO: if a block was set but not consumed, // prevent setting a different type of block - pInfo->blockType = type; + pInfo->validBlockIndex = 0; + taosArrayClear(pInfo->pBlockLists); - if (type == STREAM_INPUT__DATA_SUBMIT) { - if (tqReaderSetDataMsg(pInfo->tqReader, input, 0) < 0) { - qError("submit msg messed up when initing stream block, %s" PRIx64, id); - return TSDB_CODE_QRY_APP_ERROR; + if (type == STREAM_INPUT__MERGED_SUBMIT) { + ASSERT(numOfBlocks > 1); + for (int32_t i = 0; i < numOfBlocks; i++) { + SSubmitReq* pReq = *(void**)POINTER_SHIFT(input, i * sizeof(void*)); + taosArrayPush(pInfo->pBlockLists, &pReq); } + pInfo->blockType = STREAM_INPUT__DATA_SUBMIT; + } else if (type == STREAM_INPUT__DATA_SUBMIT) { + /*if (tqReaderSetDataMsg(pInfo->tqReader, input, 0) < 0) {*/ + /*qError("submit msg messed up when initing stream block, %s" PRIx64, id);*/ + /*return TSDB_CODE_QRY_APP_ERROR;*/ + /*}*/ + ASSERT(numOfBlocks == 1); + /*if (numOfBlocks == 1) {*/ + taosArrayPush(pInfo->pBlockLists, &input); + pInfo->blockType = STREAM_INPUT__DATA_SUBMIT; + /*} else {*/ + /*}*/ } else if (type == STREAM_INPUT__DATA_BLOCK) { for (int32_t i = 0; i < numOfBlocks; ++i) { SSDataBlock* pDataBlock = &((SSDataBlock*)input)[i]; + // TODO optimize SSDataBlock* p = createOneDataBlock(pDataBlock, false); p->info = pDataBlock->info; @@ -60,6 +75,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu taosArrayAddAll(p->pDataBlock, pDataBlock->pDataBlock); taosArrayPush(pInfo->pBlockLists, &p); } + pInfo->blockType = STREAM_INPUT__DATA_BLOCK; } else { ASSERT(0); } @@ -167,7 +183,8 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers) { return pTaskInfo; } -static SArray* filterQualifiedChildTables(const SStreamScanInfo* pScanInfo, const SArray* tableIdList, const char* idstr) { +static SArray* filterQualifiedChildTables(const SStreamScanInfo* pScanInfo, const SArray* tableIdList, + const char* idstr) { SArray* qa = taosArrayInit(4, sizeof(tb_uid_t)); // let's discard the tables those are not created according to the queried super table. diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index dc44de9cf3..2e78b61b8c 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1392,24 +1392,49 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { } SDataBlockInfo* pBlockInfo = &pInfo->pRes->info; - blockDataCleanup(pInfo->pRes); - while (tqNextDataBlock(pInfo->tqReader)) { - SSDataBlock block = {0}; + int32_t totBlockNum = taosArrayGetSize(pInfo->pBlockLists); - // todo refactor - int32_t code = tqRetrieveDataBlock(&block, pInfo->tqReader); + while (1) { + if (pInfo->tqReader->pMsg == NULL) { + if (pInfo->validBlockIndex >= totBlockNum) { + return NULL; + } - if (code != TSDB_CODE_SUCCESS || block.info.rows == 0) { - pTaskInfo->code = code; - return NULL; + int32_t current = pInfo->validBlockIndex++; + SSubmitReq* pSubmit = taosArrayGetP(pInfo->pBlockLists, current); + if (tqReaderSetDataMsg(pInfo->tqReader, pSubmit, 0) < 0) { + qError("submit msg messed up when initing stream submit block %p, current %d, total %d", pSubmit, current, + totBlockNum); + pInfo->tqReader->pMsg = NULL; + continue; + } } - setBlockIntoRes(pInfo, &block); + blockDataCleanup(pInfo->pRes); + while (tqNextDataBlock(pInfo->tqReader)) { + SSDataBlock block = {0}; + + int32_t code = tqRetrieveDataBlock(&block, pInfo->tqReader); + + if (code != TSDB_CODE_SUCCESS || block.info.rows == 0) { + continue; + } + + setBlockIntoRes(pInfo, &block); + + if (pBlockInfo->rows > 0) { + break; + } + } if (pBlockInfo->rows > 0) { break; + } else { + pInfo->tqReader->pMsg = NULL; + continue; } + /*blockDataCleanup(pInfo->pRes);*/ } // record the scan action. @@ -2557,30 +2582,30 @@ typedef struct STableMergeScanInfo { SArray* pSortInfo; SSortHandle* pSortHandle; - SSDataBlock* pSortInputBlock; - int64_t startTs; // sort start time - SArray* sortSourceParams; + SSDataBlock* pSortInputBlock; + int64_t startTs; // sort start time + SArray* sortSourceParams; SFileBlockLoadRecorder readRecorder; - int64_t numOfRows; - SScanInfo scanInfo; - int32_t scanTimes; - SNode* pFilterNode; // filter info, which is push down by optimizer - SqlFunctionCtx* pCtx; // which belongs to the direct upstream operator operator query context - SResultRowInfo* pResultRowInfo; - int32_t* rowEntryInfoOffset; - SExprInfo* pExpr; - SSDataBlock* pResBlock; - SArray* pColMatchInfo; - int32_t numOfOutput; + int64_t numOfRows; + SScanInfo scanInfo; + int32_t scanTimes; + SNode* pFilterNode; // filter info, which is push down by optimizer + SqlFunctionCtx* pCtx; // which belongs to the direct upstream operator operator query context + SResultRowInfo* pResultRowInfo; + int32_t* rowEntryInfoOffset; + SExprInfo* pExpr; + SSDataBlock* pResBlock; + SArray* pColMatchInfo; + int32_t numOfOutput; SExprInfo* pPseudoExpr; int32_t numOfPseudoExpr; SqlFunctionCtx* pPseudoCtx; SQueryTableDataCond cond; - int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan - int32_t dataBlockLoadFlag; + int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan + int32_t dataBlockLoadFlag; // if the upstream is an interval operator, the interval info is also kept here to get the time // window to check if current data block needs to be loaded. SInterval interval; @@ -2588,7 +2613,8 @@ typedef struct STableMergeScanInfo { } STableMergeScanInfo; int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags, bool groupSort, SReadHandle* pHandle, - STableListInfo* pTableListInfo, SNode* pTagCond, SNode* pTagIndexCond, const char* idStr) { + STableListInfo* pTableListInfo, SNode* pTagCond, SNode* pTagIndexCond, + const char* idStr) { int32_t code = getTableList(pHandle->meta, pHandle->vnode, pScanNode, pTagCond, pTagIndexCond, pTableListInfo); if (code != TSDB_CODE_SUCCESS) { return code; diff --git a/source/libs/stream/inc/streamInc.h b/source/libs/stream/inc/streamInc.h index d10ea76c83..093242c610 100644 --- a/source/libs/stream/inc/streamInc.h +++ b/source/libs/stream/inc/streamInc.h @@ -44,7 +44,7 @@ int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock) int32_t tEncodeStreamRetrieveReq(SEncoder* pEncoder, const SStreamRetrieveReq* pReq); -int32_t streamAppendQueueItem(SStreamQueueItem* dst, SStreamQueueItem* elem); +SStreamQueueItem* streamAppendQueueItem(SStreamQueueItem* dst, SStreamQueueItem* elem); void streamFreeQitem(SStreamQueueItem* data); #ifdef __cplusplus diff --git a/source/libs/stream/src/streamData.c b/source/libs/stream/src/streamData.c index 6be15222db..d476980393 100644 --- a/source/libs/stream/src/streamData.c +++ b/source/libs/stream/src/streamData.c @@ -77,6 +77,28 @@ FAIL: return NULL; } +SStreamMergedSubmit* streamMergedSubmitNew() { + SStreamMergedSubmit* pMerged = (SStreamMergedSubmit*)taosAllocateQitem(sizeof(SStreamMergedSubmit), DEF_QITEM); + if (pMerged == NULL) return NULL; + pMerged->reqs = taosArrayInit(0, sizeof(void*)); + pMerged->dataRefs = taosArrayInit(0, sizeof(void*)); + if (pMerged->dataRefs == NULL || pMerged->reqs == NULL) goto FAIL; + pMerged->type = STREAM_INPUT__MERGED_SUBMIT; + return pMerged; +FAIL: + if (pMerged->reqs) taosArrayDestroy(pMerged->reqs); + if (pMerged->dataRefs) taosArrayDestroy(pMerged->dataRefs); + taosFreeQitem(pMerged); + return NULL; +} + +int32_t streamMergeSubmit(SStreamMergedSubmit* pMerged, SStreamDataSubmit* pSubmit) { + taosArrayPush(pMerged->dataRefs, &pSubmit->dataRef); + taosArrayPush(pMerged->reqs, &pSubmit->data); + pMerged->ver = pSubmit->ver; + return 0; +} + static FORCE_INLINE void streamDataSubmitRefInc(SStreamDataSubmit* pDataSubmit) { atomic_add_fetch_32(pDataSubmit->dataRef, 1); } @@ -100,15 +122,31 @@ void streamDataSubmitRefDec(SStreamDataSubmit* pDataSubmit) { } } -int32_t streamAppendQueueItem(SStreamQueueItem* dst, SStreamQueueItem* elem) { +SStreamQueueItem* streamAppendQueueItem(SStreamQueueItem* dst, SStreamQueueItem* elem) { ASSERT(elem); - if (dst->type == elem->type && dst->type == STREAM_INPUT__DATA_BLOCK) { + if (dst->type == STREAM_INPUT__DATA_BLOCK && elem->type == STREAM_INPUT__DATA_BLOCK) { SStreamDataBlock* pBlock = (SStreamDataBlock*)dst; SStreamDataBlock* pBlockSrc = (SStreamDataBlock*)elem; taosArrayAddAll(pBlock->blocks, pBlockSrc->blocks); - return 0; + taosArrayDestroy(pBlockSrc->blocks); + taosFreeQitem(elem); + return dst; + } else if (dst->type == STREAM_INPUT__MERGED_SUBMIT && elem->type == STREAM_INPUT__DATA_SUBMIT) { + SStreamMergedSubmit* pMerged = (SStreamMergedSubmit*)dst; + SStreamDataSubmit* pBlockSrc = (SStreamDataSubmit*)elem; + streamMergeSubmit(pMerged, pBlockSrc); + taosFreeQitem(elem); + return dst; + } else if (dst->type == STREAM_INPUT__DATA_SUBMIT && elem->type == STREAM_INPUT__DATA_SUBMIT) { + SStreamMergedSubmit* pMerged = streamMergedSubmitNew(); + ASSERT(pMerged); + streamMergeSubmit(pMerged, (SStreamDataSubmit*)dst); + streamMergeSubmit(pMerged, (SStreamDataSubmit*)elem); + taosFreeQitem(dst); + taosFreeQitem(elem); + return (SStreamQueueItem*)pMerged; } else { - return -1; + return NULL; } } @@ -123,5 +161,20 @@ void streamFreeQitem(SStreamQueueItem* data) { } else if (type == STREAM_INPUT__DATA_SUBMIT) { streamDataSubmitRefDec((SStreamDataSubmit*)data); taosFreeQitem(data); + } else if (type == STREAM_INPUT__MERGED_SUBMIT) { + SStreamMergedSubmit* pMerge = (SStreamMergedSubmit*)data; + int32_t sz = taosArrayGetSize(pMerge->reqs); + for (int32_t i = 0; i < sz; i++) { + int32_t* ref = taosArrayGetP(pMerge->dataRefs, i); + (*ref)--; + if (*ref == 0) { + void* data = taosArrayGetP(pMerge->reqs, i); + taosMemoryFree(data); + taosMemoryFree(ref); + } + } + taosArrayDestroy(pMerge->reqs); + taosArrayDestroy(pMerge->dataRefs); + taosFreeQitem(pMerge); } } diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index b59a812678..a8192b49f3 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -33,9 +33,13 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, void* data, SArray* pRes) SArray* blocks = pBlock->blocks; qDebug("task %d %p set ssdata input", pTask->taskId, pTask); qSetMultiStreamInput(exec, blocks->pData, blocks->size, STREAM_INPUT__DATA_BLOCK, false); - } else if (pItem->type == STREAM_INPUT__DROP) { - // TODO exec drop - return 0; + } else if (pItem->type == STREAM_INPUT__MERGED_SUBMIT) { + SStreamMergedSubmit* pMerged = (SStreamMergedSubmit*)data; + SArray* blocks = pMerged->reqs; + qDebug("task %d %p set submit input (merged), batch num: %d", pTask->taskId, pTask, (int32_t)blocks->size); + qSetMultiStreamInput(exec, blocks->pData, blocks->size, STREAM_INPUT__MERGED_SUBMIT, false); + } else { + ASSERT(0); } // exec @@ -144,7 +148,7 @@ int32_t streamPipelineExec(SStreamTask* pTask, int32_t batchNum) { static SArray* streamExecForQall(SStreamTask* pTask, SArray* pRes) { while (1) { - int32_t cnt = 0; + int32_t cnt = 1; void* data = NULL; while (1) { SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputQueue); @@ -155,24 +159,23 @@ static SArray* streamExecForQall(SStreamTask* pTask, SArray* pRes) { if (data == NULL) { data = qItem; streamQueueProcessSuccess(pTask->inputQueue); - if (qItem->type == STREAM_INPUT__DATA_BLOCK) { - /*streamUpdateVer(pTask, (SStreamDataBlock*)qItem);*/ - } else { - break; - } + /*if (qItem->type == STREAM_INPUT__DATA_BLOCK) {*/ + /*streamUpdateVer(pTask, (SStreamDataBlock*)qItem);*/ + /*}*/ } else { - if (streamAppendQueueItem(data, qItem) < 0) { + void* newRet; + if ((newRet = streamAppendQueueItem(data, qItem)) == NULL) { streamQueueProcessFail(pTask->inputQueue); break; } else { cnt++; + data = newRet; /*streamUpdateVer(pTask, (SStreamDataBlock*)qItem);*/ streamQueueProcessSuccess(pTask->inputQueue); - taosArrayDestroy(((SStreamDataBlock*)qItem)->blocks); - taosFreeQitem(qItem); } } } + if (pTask->taskStatus == TASK_STATUS__DROPPING) { if (data) streamFreeQitem(data); taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); @@ -194,6 +197,7 @@ static SArray* streamExecForQall(SStreamTask* pTask, SArray* pRes) { if (taosArrayGetSize(pRes) != 0) { SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM); if (qRes == NULL) { + // TODO log failed ver streamQueueProcessFail(pTask->inputQueue); taosArrayDestroy(pRes); return NULL; @@ -201,6 +205,7 @@ static SArray* streamExecForQall(SStreamTask* pTask, SArray* pRes) { qRes->type = STREAM_INPUT__DATA_BLOCK; qRes->blocks = pRes; if (streamTaskOutput(pTask, qRes) < 0) { + // TODO log failed ver /*streamQueueProcessFail(pTask->inputQueue);*/ taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); taosFreeQitem(qRes); diff --git a/source/libs/wal/src/walRead.c b/source/libs/wal/src/walRead.c index c47964803a..8b4225c80c 100644 --- a/source/libs/wal/src/walRead.c +++ b/source/libs/wal/src/walRead.c @@ -417,7 +417,7 @@ int32_t walReadVer(SWalReader *pRead, int64_t ver) { } if (ver > pRead->pWal->vers.lastVer || ver < pRead->pWal->vers.firstVer) { - wError("vgId:%d, invalid index:%" PRId64 ", first index:%" PRId64 ", last index:%" PRId64, pRead->pWal->cfg.vgId, + wDebug("vgId:%d, invalid index:%" PRId64 ", first index:%" PRId64 ", last index:%" PRId64, pRead->pWal->cfg.vgId, ver, pRead->pWal->vers.firstVer, pRead->pWal->vers.lastVer); terrno = TSDB_CODE_WAL_LOG_NOT_EXIST; return -1; @@ -425,7 +425,7 @@ int32_t walReadVer(SWalReader *pRead, int64_t ver) { if (pRead->curInvalid || pRead->curVersion != ver) { if (walReadSeekVer(pRead, ver) < 0) { - wError("vgId:%d, unexpected wal log index:%" PRId64 ", since %s", pRead->pWal->cfg.vgId, ver, terrstr()); + wError("vgId:%d, unexpected wal log, index:%" PRId64 ", since %s", pRead->pWal->cfg.vgId, ver, terrstr()); return -1; } seeked = true; @@ -452,7 +452,8 @@ int32_t walReadVer(SWalReader *pRead, int64_t ver) { contLen = walValidHeadCksum(pRead->pHead); if (contLen != 0) { - wError("vgId:%d, unexpected wal log index:%" PRId64 ", since head checksum not passed", pRead->pWal->cfg.vgId, ver); + wError("vgId:%d, unexpected wal log, index:%" PRId64 ", since head checksum not passed", pRead->pWal->cfg.vgId, + ver); terrno = TSDB_CODE_WAL_FILE_CORRUPTED; return -1; } @@ -479,7 +480,7 @@ int32_t walReadVer(SWalReader *pRead, int64_t ver) { } if (pRead->pHead->head.version != ver) { - wError("vgId:%d, unexpected wal log index:%" PRId64 ", read request index:%" PRId64, pRead->pWal->cfg.vgId, + wError("vgId:%d, unexpected wal log, index:%" PRId64 ", read request index:%" PRId64, pRead->pWal->cfg.vgId, pRead->pHead->head.version, ver); pRead->curInvalid = 1; terrno = TSDB_CODE_WAL_FILE_CORRUPTED; @@ -489,7 +490,8 @@ int32_t walReadVer(SWalReader *pRead, int64_t ver) { contLen = walValidBodyCksum(pRead->pHead); if (contLen != 0) { - wError("vgId:%d, unexpected wal log index:%" PRId64 ", since body checksum not passed", pRead->pWal->cfg.vgId, ver); + wError("vgId:%d, unexpected wal log, index:%" PRId64 ", since body checksum not passed", pRead->pWal->cfg.vgId, + ver); pRead->curInvalid = 1; terrno = TSDB_CODE_WAL_FILE_CORRUPTED; ASSERT(0); diff --git a/source/util/src/tarray.c b/source/util/src/tarray.c index 23e79da948..6095b67588 100644 --- a/source/util/src/tarray.c +++ b/source/util/src/tarray.c @@ -173,6 +173,46 @@ void taosArrayRemoveDuplicate(SArray* pArray, __compar_fn_t comparFn, void (*fp) pArray->size = pos + 1; } +void taosArrayRemoveDuplicateP(SArray* pArray, __compar_fn_t comparFn, void (*fp)(void*)) { + assert(pArray); + + size_t size = pArray->size; + if (size <= 1) { + return; + } + + int32_t pos = 0; + for (int32_t i = 1; i < size; ++i) { + char* p1 = taosArrayGet(pArray, pos); + char* p2 = taosArrayGet(pArray, i); + + if (comparFn(p1, p2) == 0) { + // do nothing + } else { + if (pos + 1 != i) { + void* p = taosArrayGet(pArray, pos + 1); + if (fp != NULL) { + fp(p); + } + + taosArraySet(pArray, pos + 1, p2); + pos += 1; + } else { + pos += 1; + } + } + } + + if (fp != NULL) { + for (int32_t i = pos + 1; i < pArray->size; ++i) { + void* p = taosArrayGetP(pArray, i); + fp(p); + } + } + + pArray->size = pos + 1; +} + void* taosArrayAddAll(SArray* pArray, const SArray* pInput) { if (pInput) { return taosArrayAddBatch(pArray, pInput->pData, (int32_t)taosArrayGetSize(pInput)); diff --git a/tests/system-test/7-tmq/subscribeDb4.py b/tests/system-test/7-tmq/subscribeDb4.py index b99704b602..145cbbbbf5 100644 --- a/tests/system-test/7-tmq/subscribeDb4.py +++ b/tests/system-test/7-tmq/subscribeDb4.py @@ -88,7 +88,7 @@ class TDTestCase: tmqCom.startTmqSimProcess(self.pollDelay,self.paraDict["dbName"],self.showMsg, self.showRow,self.cdbName) tdLog.info("After waiting for a period of time, drop one stable") - time.sleep(10) + time.sleep(3) tdSql.execute("drop table %s.%s" %(self.paraDict['dbName'], self.paraDict['stbName'])) tdLog.info("wait result from consumer, then check it")