Merge pull request #15139 from taosdata/feature/stream

refactor(stream): batch optimization for submit msg
This commit is contained in:
Liu Jicong 2022-07-19 20:27:39 +08:00 committed by GitHub
commit 5d6df744d5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 248 additions and 77 deletions

View File

@ -54,12 +54,12 @@ enum {
enum { enum {
STREAM_INPUT__DATA_SUBMIT = 1, STREAM_INPUT__DATA_SUBMIT = 1,
STREAM_INPUT__DATA_BLOCK, STREAM_INPUT__DATA_BLOCK,
STREAM_INPUT__MERGED_SUBMIT,
// STREAM_INPUT__TABLE_SCAN, // STREAM_INPUT__TABLE_SCAN,
STREAM_INPUT__TQ_SCAN, STREAM_INPUT__TQ_SCAN,
STREAM_INPUT__DATA_RETRIEVE, STREAM_INPUT__DATA_RETRIEVE,
STREAM_INPUT__GET_RES, STREAM_INPUT__GET_RES,
STREAM_INPUT__CHECKPOINT, STREAM_INPUT__CHECKPOINT,
STREAM_INPUT__DROP,
}; };
typedef enum EStreamType { typedef enum EStreamType {

View File

@ -77,6 +77,13 @@ typedef struct {
SSubmitReq* data; SSubmitReq* data;
} SStreamDataSubmit; } SStreamDataSubmit;
typedef struct {
int8_t type;
int64_t ver;
SArray* dataRefs; // SArray<int32_t*>
SArray* reqs; // SArray<SSubmitReq*>
} SStreamMergedSubmit;
typedef struct { typedef struct {
int8_t type; int8_t type;

View File

@ -87,6 +87,14 @@ void taosArrayRemoveBatch(SArray* pArray, const int32_t* pData, int32_t numOfEle
*/ */
void taosArrayRemoveDuplicate(SArray* pArray, __compar_fn_t comparFn, void (*fp)(void*)); void taosArrayRemoveDuplicate(SArray* pArray, __compar_fn_t comparFn, void (*fp)(void*));
/**
*
* @param pArray
* @param comparFn
* @param fp
*/
void taosArrayRemoveDuplicateP(SArray* pArray, __compar_fn_t comparFn, void (*fp)(void*));
/** /**
* add all element from the source array list into the destination * add all element from the source array list into the destination
* @param pArray * @param pArray

View File

@ -452,7 +452,7 @@ static int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
int32_t code = -1; int32_t code = -1;
SArray *newSub = subscribe.topicNames; SArray *newSub = subscribe.topicNames;
taosArraySortString(newSub, taosArrayCompareString); taosArraySortString(newSub, taosArrayCompareString);
taosArrayRemoveDuplicate(newSub, taosArrayCompareString, taosMemoryFree); taosArrayRemoveDuplicateP(newSub, taosArrayCompareString, taosMemoryFree);
int32_t newTopicNum = taosArrayGetSize(newSub); int32_t newTopicNum = taosArrayGetSize(newSub);
// check topic existance // check topic existance

View File

@ -91,7 +91,7 @@ int metaBegin(SMeta* pMeta);
int metaCommit(SMeta* pMeta); int metaCommit(SMeta* pMeta);
int metaCreateSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* pReq); int metaCreateSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* pReq);
int metaAlterSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* pReq); int metaAlterSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* pReq);
int metaDropSTable(SMeta* pMeta, int64_t verison, SVDropStbReq* pReq); int metaDropSTable(SMeta* pMeta, int64_t verison, SVDropStbReq* pReq, SArray* tbUidList);
int metaCreateTable(SMeta* pMeta, int64_t version, SVCreateTbReq* pReq); int metaCreateTable(SMeta* pMeta, int64_t version, SVCreateTbReq* pReq);
int metaDropTable(SMeta* pMeta, int64_t version, SVDropTbReq* pReq, SArray* tbUids); int metaDropTable(SMeta* pMeta, int64_t version, SVDropTbReq* pReq, SArray* tbUids);
int metaTtlDropTable(SMeta* pMeta, int64_t ttl, SArray* tbUids); int metaTtlDropTable(SMeta* pMeta, int64_t ttl, SArray* tbUids);

View File

@ -212,7 +212,7 @@ _err:
return -1; return -1;
} }
int metaDropSTable(SMeta *pMeta, int64_t verison, SVDropStbReq *pReq) { int metaDropSTable(SMeta *pMeta, int64_t verison, SVDropStbReq *pReq, SArray *tbUidList) {
void *pKey = NULL; void *pKey = NULL;
int nKey = 0; int nKey = 0;
void *pData = NULL; void *pData = NULL;
@ -228,8 +228,7 @@ int metaDropSTable(SMeta *pMeta, int64_t verison, SVDropStbReq *pReq) {
} }
// drop all child tables // drop all child tables
TBC *pCtbIdxc = NULL; TBC *pCtbIdxc = NULL;
SArray *pArray = taosArrayInit(8, sizeof(tb_uid_t));
tdbTbcOpen(pMeta->pCtbIdx, &pCtbIdxc, &pMeta->txn); tdbTbcOpen(pMeta->pCtbIdx, &pCtbIdxc, &pMeta->txn);
rc = tdbTbcMoveTo(pCtbIdxc, &(SCtbIdxKey){.suid = pReq->suid, .uid = INT64_MIN}, sizeof(SCtbIdxKey), &c); rc = tdbTbcMoveTo(pCtbIdxc, &(SCtbIdxKey){.suid = pReq->suid, .uid = INT64_MIN}, sizeof(SCtbIdxKey), &c);
@ -249,20 +248,18 @@ int metaDropSTable(SMeta *pMeta, int64_t verison, SVDropStbReq *pReq) {
break; break;
} }
taosArrayPush(pArray, &(((SCtbIdxKey *)pKey)->uid)); taosArrayPush(tbUidList, &(((SCtbIdxKey *)pKey)->uid));
} }
tdbTbcClose(pCtbIdxc); tdbTbcClose(pCtbIdxc);
metaWLock(pMeta); metaWLock(pMeta);
for (int32_t iChild = 0; iChild < taosArrayGetSize(pArray); iChild++) { for (int32_t iChild = 0; iChild < taosArrayGetSize(tbUidList); iChild++) {
tb_uid_t uid = *(tb_uid_t *)taosArrayGet(pArray, iChild); tb_uid_t uid = *(tb_uid_t *)taosArrayGet(tbUidList, iChild);
metaDropTableByUid(pMeta, uid, NULL); metaDropTableByUid(pMeta, uid, NULL);
} }
taosArrayDestroy(pArray);
// drop super table // drop super table
_drop_super_table: _drop_super_table:
tdbTbGet(pMeta->pUidIdx, &pReq->suid, sizeof(tb_uid_t), &pData, &nData); tdbTbGet(pMeta->pUidIdx, &pReq->suid, sizeof(tb_uid_t), &pData, &nData);

View File

@ -49,8 +49,8 @@ static int32_t tqAddBlockSchemaToRsp(const STqExecHandle* pExec, int32_t workerI
static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, SMqDataRsp* pRsp) { static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, SMqDataRsp* pRsp) {
SMetaReader mr = {0}; SMetaReader mr = {0};
metaReaderInit(&mr, pTq->pVnode->pMeta, 0); metaReaderInit(&mr, pTq->pVnode->pMeta, 0);
// TODO add reference to gurantee success
if (metaGetTableEntryByUid(&mr, uid) < 0) { if (metaGetTableEntryByUid(&mr, uid) < 0) {
ASSERT(0);
return -1; return -1;
} }
char* tbName = strdup(mr.me.name); char* tbName = strdup(mr.me.name);
@ -87,16 +87,18 @@ int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVa
tqDebug("task execute end, get %p", pDataBlock); tqDebug("task execute end, get %p", pDataBlock);
if (pDataBlock != NULL) { if (pDataBlock != NULL) {
tqAddBlockDataToRsp(pDataBlock, pRsp, pExec->numOfCols);
pRsp->blockNum++;
if (pRsp->withTbName) { if (pRsp->withTbName) {
if (pOffset->type == TMQ_OFFSET__LOG) { if (pOffset->type == TMQ_OFFSET__LOG) {
int64_t uid = pExec->pExecReader[0]->msgIter.uid; int64_t uid = pExec->pExecReader[0]->msgIter.uid;
tqAddTbNameToRsp(pTq, uid, pRsp); if (tqAddTbNameToRsp(pTq, uid, pRsp) < 0) {
continue;
}
} else { } else {
pRsp->withTbName = 0; pRsp->withTbName = 0;
} }
} }
tqAddBlockDataToRsp(pDataBlock, pRsp, pExec->numOfCols);
pRsp->blockNum++;
if (pOffset->type == TMQ_OFFSET__LOG) { if (pOffset->type == TMQ_OFFSET__LOG) {
continue; continue;
} else { } else {
@ -193,13 +195,14 @@ int32_t tqLogScanExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataR
SSDataBlock block = {0}; SSDataBlock block = {0};
if (tqRetrieveDataBlock(&block, pReader) < 0) { if (tqRetrieveDataBlock(&block, pReader) < 0) {
if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue; if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue;
ASSERT(0);
} }
tqAddBlockDataToRsp(&block, pRsp, taosArrayGetSize(block.pDataBlock));
if (pRsp->withTbName) { if (pRsp->withTbName) {
int64_t uid = pExec->pExecReader[workerId]->msgIter.uid; int64_t uid = pExec->pExecReader[workerId]->msgIter.uid;
tqAddTbNameToRsp(pTq, uid, pRsp); if (tqAddTbNameToRsp(pTq, uid, pRsp) < 0) {
continue;
}
} }
tqAddBlockDataToRsp(&block, pRsp, taosArrayGetSize(block.pDataBlock));
tqAddBlockSchemaToRsp(pExec, workerId, pRsp); tqAddBlockSchemaToRsp(pExec, workerId, pRsp);
pRsp->blockNum++; pRsp->blockNum++;
} }
@ -211,13 +214,14 @@ int32_t tqLogScanExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataR
SSDataBlock block = {0}; SSDataBlock block = {0};
if (tqRetrieveDataBlock(&block, pReader) < 0) { if (tqRetrieveDataBlock(&block, pReader) < 0) {
if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue; if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue;
ASSERT(0);
} }
tqAddBlockDataToRsp(&block, pRsp, taosArrayGetSize(block.pDataBlock));
if (pRsp->withTbName) { if (pRsp->withTbName) {
int64_t uid = pExec->pExecReader[workerId]->msgIter.uid; int64_t uid = pExec->pExecReader[workerId]->msgIter.uid;
tqAddTbNameToRsp(pTq, uid, pRsp); if (tqAddTbNameToRsp(pTq, uid, pRsp) < 0) {
continue;
}
} }
tqAddBlockDataToRsp(&block, pRsp, taosArrayGetSize(block.pDataBlock));
tqAddBlockSchemaToRsp(pExec, workerId, pRsp); tqAddBlockSchemaToRsp(pExec, workerId, pRsp);
pRsp->blockNum++; pRsp->blockNum++;
} }

View File

@ -299,6 +299,7 @@ int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, STqReader* pReader) {
} }
if (blockDataEnsureCapacity(pBlock, pReader->msgIter.numOfRows) < 0) { if (blockDataEnsureCapacity(pBlock, pReader->msgIter.numOfRows) < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto FAIL; goto FAIL;
} }

View File

@ -178,6 +178,8 @@ void tqTableSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data) {
const SArray* pRes = (const SArray*)data; const SArray* pRes = (const SArray*)data;
SVnode* pVnode = (SVnode*)vnode; SVnode* pVnode = (SVnode*)vnode;
tqDebug("task write into table, vgId %d, block num: %d", pVnode->config.vgId, (int32_t)pRes->size);
ASSERT(pTask->tbSink.pTSchema); ASSERT(pTask->tbSink.pTSchema);
SSubmitReq* pReq = tdBlockToSubmit(pRes, pTask->tbSink.pTSchema, true, pTask->tbSink.stbUid, SSubmitReq* pReq = tdBlockToSubmit(pRes, pTask->tbSink.pTSchema, true, pTask->tbSink.stbUid,
pTask->tbSink.stbFullName, pVnode->config.vgId); pTask->tbSink.stbFullName, pVnode->config.vgId);

View File

@ -557,6 +557,7 @@ static int32_t vnodeProcessDropStbReq(SVnode *pVnode, int64_t version, void *pRe
SVDropStbReq req = {0}; SVDropStbReq req = {0};
int32_t rcode = TSDB_CODE_SUCCESS; int32_t rcode = TSDB_CODE_SUCCESS;
SDecoder decoder = {0}; SDecoder decoder = {0};
SArray *tbUidList = NULL;
pRsp->msgType = TDMT_VND_CREATE_STB_RSP; pRsp->msgType = TDMT_VND_CREATE_STB_RSP;
pRsp->pCont = NULL; pRsp->pCont = NULL;
@ -570,7 +571,14 @@ static int32_t vnodeProcessDropStbReq(SVnode *pVnode, int64_t version, void *pRe
} }
// process request // process request
if (metaDropSTable(pVnode->pMeta, version, &req) < 0) { tbUidList = taosArrayInit(8, sizeof(int64_t));
if (tbUidList == NULL) goto _exit;
if (metaDropSTable(pVnode->pMeta, version, &req, tbUidList) < 0) {
rcode = terrno;
goto _exit;
}
if (tqUpdateTbUidList(pVnode->pTq, tbUidList, false) < 0) {
rcode = terrno; rcode = terrno;
goto _exit; goto _exit;
} }
@ -582,6 +590,7 @@ static int32_t vnodeProcessDropStbReq(SVnode *pVnode, int64_t version, void *pRe
// return rsp // return rsp
_exit: _exit:
if (tbUidList) taosArrayDestroy(tbUidList);
pRsp->code = rcode; pRsp->code = rcode;
tDecoderClear(&decoder); tDecoderClear(&decoder);
return 0; return 0;

View File

@ -42,17 +42,32 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
// TODO: if a block was set but not consumed, // TODO: if a block was set but not consumed,
// prevent setting a different type of block // prevent setting a different type of block
pInfo->blockType = type; pInfo->validBlockIndex = 0;
taosArrayClear(pInfo->pBlockLists);
if (type == STREAM_INPUT__DATA_SUBMIT) { if (type == STREAM_INPUT__MERGED_SUBMIT) {
if (tqReaderSetDataMsg(pInfo->tqReader, input, 0) < 0) { ASSERT(numOfBlocks > 1);
qError("submit msg messed up when initing stream block, %s" PRIx64, id); for (int32_t i = 0; i < numOfBlocks; i++) {
return TSDB_CODE_QRY_APP_ERROR; SSubmitReq* pReq = *(void**)POINTER_SHIFT(input, i * sizeof(void*));
taosArrayPush(pInfo->pBlockLists, &pReq);
} }
pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
} else if (type == STREAM_INPUT__DATA_SUBMIT) {
/*if (tqReaderSetDataMsg(pInfo->tqReader, input, 0) < 0) {*/
/*qError("submit msg messed up when initing stream block, %s" PRIx64, id);*/
/*return TSDB_CODE_QRY_APP_ERROR;*/
/*}*/
ASSERT(numOfBlocks == 1);
/*if (numOfBlocks == 1) {*/
taosArrayPush(pInfo->pBlockLists, &input);
pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
/*} else {*/
/*}*/
} else if (type == STREAM_INPUT__DATA_BLOCK) { } else if (type == STREAM_INPUT__DATA_BLOCK) {
for (int32_t i = 0; i < numOfBlocks; ++i) { for (int32_t i = 0; i < numOfBlocks; ++i) {
SSDataBlock* pDataBlock = &((SSDataBlock*)input)[i]; SSDataBlock* pDataBlock = &((SSDataBlock*)input)[i];
// TODO optimize
SSDataBlock* p = createOneDataBlock(pDataBlock, false); SSDataBlock* p = createOneDataBlock(pDataBlock, false);
p->info = pDataBlock->info; p->info = pDataBlock->info;
@ -60,6 +75,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
taosArrayAddAll(p->pDataBlock, pDataBlock->pDataBlock); taosArrayAddAll(p->pDataBlock, pDataBlock->pDataBlock);
taosArrayPush(pInfo->pBlockLists, &p); taosArrayPush(pInfo->pBlockLists, &p);
} }
pInfo->blockType = STREAM_INPUT__DATA_BLOCK;
} else { } else {
ASSERT(0); ASSERT(0);
} }
@ -167,7 +183,8 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers) {
return pTaskInfo; return pTaskInfo;
} }
static SArray* filterQualifiedChildTables(const SStreamScanInfo* pScanInfo, const SArray* tableIdList, const char* idstr) { static SArray* filterQualifiedChildTables(const SStreamScanInfo* pScanInfo, const SArray* tableIdList,
const char* idstr) {
SArray* qa = taosArrayInit(4, sizeof(tb_uid_t)); SArray* qa = taosArrayInit(4, sizeof(tb_uid_t));
// let's discard the tables those are not created according to the queried super table. // let's discard the tables those are not created according to the queried super table.

View File

@ -1392,24 +1392,49 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
} }
SDataBlockInfo* pBlockInfo = &pInfo->pRes->info; SDataBlockInfo* pBlockInfo = &pInfo->pRes->info;
blockDataCleanup(pInfo->pRes);
while (tqNextDataBlock(pInfo->tqReader)) { int32_t totBlockNum = taosArrayGetSize(pInfo->pBlockLists);
SSDataBlock block = {0};
// todo refactor while (1) {
int32_t code = tqRetrieveDataBlock(&block, pInfo->tqReader); if (pInfo->tqReader->pMsg == NULL) {
if (pInfo->validBlockIndex >= totBlockNum) {
return NULL;
}
if (code != TSDB_CODE_SUCCESS || block.info.rows == 0) { int32_t current = pInfo->validBlockIndex++;
pTaskInfo->code = code; SSubmitReq* pSubmit = taosArrayGetP(pInfo->pBlockLists, current);
return NULL; if (tqReaderSetDataMsg(pInfo->tqReader, pSubmit, 0) < 0) {
qError("submit msg messed up when initing stream submit block %p, current %d, total %d", pSubmit, current,
totBlockNum);
pInfo->tqReader->pMsg = NULL;
continue;
}
} }
setBlockIntoRes(pInfo, &block); blockDataCleanup(pInfo->pRes);
while (tqNextDataBlock(pInfo->tqReader)) {
SSDataBlock block = {0};
int32_t code = tqRetrieveDataBlock(&block, pInfo->tqReader);
if (code != TSDB_CODE_SUCCESS || block.info.rows == 0) {
continue;
}
setBlockIntoRes(pInfo, &block);
if (pBlockInfo->rows > 0) {
break;
}
}
if (pBlockInfo->rows > 0) { if (pBlockInfo->rows > 0) {
break; break;
} else {
pInfo->tqReader->pMsg = NULL;
continue;
} }
/*blockDataCleanup(pInfo->pRes);*/
} }
// record the scan action. // record the scan action.
@ -2557,30 +2582,30 @@ typedef struct STableMergeScanInfo {
SArray* pSortInfo; SArray* pSortInfo;
SSortHandle* pSortHandle; SSortHandle* pSortHandle;
SSDataBlock* pSortInputBlock; SSDataBlock* pSortInputBlock;
int64_t startTs; // sort start time int64_t startTs; // sort start time
SArray* sortSourceParams; SArray* sortSourceParams;
SFileBlockLoadRecorder readRecorder; SFileBlockLoadRecorder readRecorder;
int64_t numOfRows; int64_t numOfRows;
SScanInfo scanInfo; SScanInfo scanInfo;
int32_t scanTimes; int32_t scanTimes;
SNode* pFilterNode; // filter info, which is push down by optimizer SNode* pFilterNode; // filter info, which is push down by optimizer
SqlFunctionCtx* pCtx; // which belongs to the direct upstream operator operator query context SqlFunctionCtx* pCtx; // which belongs to the direct upstream operator operator query context
SResultRowInfo* pResultRowInfo; SResultRowInfo* pResultRowInfo;
int32_t* rowEntryInfoOffset; int32_t* rowEntryInfoOffset;
SExprInfo* pExpr; SExprInfo* pExpr;
SSDataBlock* pResBlock; SSDataBlock* pResBlock;
SArray* pColMatchInfo; SArray* pColMatchInfo;
int32_t numOfOutput; int32_t numOfOutput;
SExprInfo* pPseudoExpr; SExprInfo* pPseudoExpr;
int32_t numOfPseudoExpr; int32_t numOfPseudoExpr;
SqlFunctionCtx* pPseudoCtx; SqlFunctionCtx* pPseudoCtx;
SQueryTableDataCond cond; SQueryTableDataCond cond;
int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan
int32_t dataBlockLoadFlag; int32_t dataBlockLoadFlag;
// if the upstream is an interval operator, the interval info is also kept here to get the time // if the upstream is an interval operator, the interval info is also kept here to get the time
// window to check if current data block needs to be loaded. // window to check if current data block needs to be loaded.
SInterval interval; SInterval interval;
@ -2588,7 +2613,8 @@ typedef struct STableMergeScanInfo {
} STableMergeScanInfo; } STableMergeScanInfo;
int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags, bool groupSort, SReadHandle* pHandle, int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags, bool groupSort, SReadHandle* pHandle,
STableListInfo* pTableListInfo, SNode* pTagCond, SNode* pTagIndexCond, const char* idStr) { STableListInfo* pTableListInfo, SNode* pTagCond, SNode* pTagIndexCond,
const char* idStr) {
int32_t code = getTableList(pHandle->meta, pHandle->vnode, pScanNode, pTagCond, pTagIndexCond, pTableListInfo); int32_t code = getTableList(pHandle->meta, pHandle->vnode, pScanNode, pTagCond, pTagIndexCond, pTableListInfo);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;

View File

@ -44,7 +44,7 @@ int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock)
int32_t tEncodeStreamRetrieveReq(SEncoder* pEncoder, const SStreamRetrieveReq* pReq); int32_t tEncodeStreamRetrieveReq(SEncoder* pEncoder, const SStreamRetrieveReq* pReq);
int32_t streamAppendQueueItem(SStreamQueueItem* dst, SStreamQueueItem* elem); SStreamQueueItem* streamAppendQueueItem(SStreamQueueItem* dst, SStreamQueueItem* elem);
void streamFreeQitem(SStreamQueueItem* data); void streamFreeQitem(SStreamQueueItem* data);
#ifdef __cplusplus #ifdef __cplusplus

View File

@ -77,6 +77,28 @@ FAIL:
return NULL; return NULL;
} }
SStreamMergedSubmit* streamMergedSubmitNew() {
SStreamMergedSubmit* pMerged = (SStreamMergedSubmit*)taosAllocateQitem(sizeof(SStreamMergedSubmit), DEF_QITEM);
if (pMerged == NULL) return NULL;
pMerged->reqs = taosArrayInit(0, sizeof(void*));
pMerged->dataRefs = taosArrayInit(0, sizeof(void*));
if (pMerged->dataRefs == NULL || pMerged->reqs == NULL) goto FAIL;
pMerged->type = STREAM_INPUT__MERGED_SUBMIT;
return pMerged;
FAIL:
if (pMerged->reqs) taosArrayDestroy(pMerged->reqs);
if (pMerged->dataRefs) taosArrayDestroy(pMerged->dataRefs);
taosFreeQitem(pMerged);
return NULL;
}
int32_t streamMergeSubmit(SStreamMergedSubmit* pMerged, SStreamDataSubmit* pSubmit) {
taosArrayPush(pMerged->dataRefs, &pSubmit->dataRef);
taosArrayPush(pMerged->reqs, &pSubmit->data);
pMerged->ver = pSubmit->ver;
return 0;
}
static FORCE_INLINE void streamDataSubmitRefInc(SStreamDataSubmit* pDataSubmit) { static FORCE_INLINE void streamDataSubmitRefInc(SStreamDataSubmit* pDataSubmit) {
atomic_add_fetch_32(pDataSubmit->dataRef, 1); atomic_add_fetch_32(pDataSubmit->dataRef, 1);
} }
@ -100,15 +122,31 @@ void streamDataSubmitRefDec(SStreamDataSubmit* pDataSubmit) {
} }
} }
int32_t streamAppendQueueItem(SStreamQueueItem* dst, SStreamQueueItem* elem) { SStreamQueueItem* streamAppendQueueItem(SStreamQueueItem* dst, SStreamQueueItem* elem) {
ASSERT(elem); ASSERT(elem);
if (dst->type == elem->type && dst->type == STREAM_INPUT__DATA_BLOCK) { if (dst->type == STREAM_INPUT__DATA_BLOCK && elem->type == STREAM_INPUT__DATA_BLOCK) {
SStreamDataBlock* pBlock = (SStreamDataBlock*)dst; SStreamDataBlock* pBlock = (SStreamDataBlock*)dst;
SStreamDataBlock* pBlockSrc = (SStreamDataBlock*)elem; SStreamDataBlock* pBlockSrc = (SStreamDataBlock*)elem;
taosArrayAddAll(pBlock->blocks, pBlockSrc->blocks); taosArrayAddAll(pBlock->blocks, pBlockSrc->blocks);
return 0; taosArrayDestroy(pBlockSrc->blocks);
taosFreeQitem(elem);
return dst;
} else if (dst->type == STREAM_INPUT__MERGED_SUBMIT && elem->type == STREAM_INPUT__DATA_SUBMIT) {
SStreamMergedSubmit* pMerged = (SStreamMergedSubmit*)dst;
SStreamDataSubmit* pBlockSrc = (SStreamDataSubmit*)elem;
streamMergeSubmit(pMerged, pBlockSrc);
taosFreeQitem(elem);
return dst;
} else if (dst->type == STREAM_INPUT__DATA_SUBMIT && elem->type == STREAM_INPUT__DATA_SUBMIT) {
SStreamMergedSubmit* pMerged = streamMergedSubmitNew();
ASSERT(pMerged);
streamMergeSubmit(pMerged, (SStreamDataSubmit*)dst);
streamMergeSubmit(pMerged, (SStreamDataSubmit*)elem);
taosFreeQitem(dst);
taosFreeQitem(elem);
return (SStreamQueueItem*)pMerged;
} else { } else {
return -1; return NULL;
} }
} }
@ -123,5 +161,20 @@ void streamFreeQitem(SStreamQueueItem* data) {
} else if (type == STREAM_INPUT__DATA_SUBMIT) { } else if (type == STREAM_INPUT__DATA_SUBMIT) {
streamDataSubmitRefDec((SStreamDataSubmit*)data); streamDataSubmitRefDec((SStreamDataSubmit*)data);
taosFreeQitem(data); taosFreeQitem(data);
} else if (type == STREAM_INPUT__MERGED_SUBMIT) {
SStreamMergedSubmit* pMerge = (SStreamMergedSubmit*)data;
int32_t sz = taosArrayGetSize(pMerge->reqs);
for (int32_t i = 0; i < sz; i++) {
int32_t* ref = taosArrayGetP(pMerge->dataRefs, i);
(*ref)--;
if (*ref == 0) {
void* data = taosArrayGetP(pMerge->reqs, i);
taosMemoryFree(data);
taosMemoryFree(ref);
}
}
taosArrayDestroy(pMerge->reqs);
taosArrayDestroy(pMerge->dataRefs);
taosFreeQitem(pMerge);
} }
} }

View File

@ -33,9 +33,13 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, void* data, SArray* pRes)
SArray* blocks = pBlock->blocks; SArray* blocks = pBlock->blocks;
qDebug("task %d %p set ssdata input", pTask->taskId, pTask); qDebug("task %d %p set ssdata input", pTask->taskId, pTask);
qSetMultiStreamInput(exec, blocks->pData, blocks->size, STREAM_INPUT__DATA_BLOCK, false); qSetMultiStreamInput(exec, blocks->pData, blocks->size, STREAM_INPUT__DATA_BLOCK, false);
} else if (pItem->type == STREAM_INPUT__DROP) { } else if (pItem->type == STREAM_INPUT__MERGED_SUBMIT) {
// TODO exec drop SStreamMergedSubmit* pMerged = (SStreamMergedSubmit*)data;
return 0; SArray* blocks = pMerged->reqs;
qDebug("task %d %p set submit input (merged), batch num: %d", pTask->taskId, pTask, (int32_t)blocks->size);
qSetMultiStreamInput(exec, blocks->pData, blocks->size, STREAM_INPUT__MERGED_SUBMIT, false);
} else {
ASSERT(0);
} }
// exec // exec
@ -144,7 +148,7 @@ int32_t streamPipelineExec(SStreamTask* pTask, int32_t batchNum) {
static SArray* streamExecForQall(SStreamTask* pTask, SArray* pRes) { static SArray* streamExecForQall(SStreamTask* pTask, SArray* pRes) {
while (1) { while (1) {
int32_t cnt = 0; int32_t cnt = 1;
void* data = NULL; void* data = NULL;
while (1) { while (1) {
SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputQueue); SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputQueue);
@ -155,24 +159,23 @@ static SArray* streamExecForQall(SStreamTask* pTask, SArray* pRes) {
if (data == NULL) { if (data == NULL) {
data = qItem; data = qItem;
streamQueueProcessSuccess(pTask->inputQueue); streamQueueProcessSuccess(pTask->inputQueue);
if (qItem->type == STREAM_INPUT__DATA_BLOCK) { /*if (qItem->type == STREAM_INPUT__DATA_BLOCK) {*/
/*streamUpdateVer(pTask, (SStreamDataBlock*)qItem);*/ /*streamUpdateVer(pTask, (SStreamDataBlock*)qItem);*/
} else { /*}*/
break;
}
} else { } else {
if (streamAppendQueueItem(data, qItem) < 0) { void* newRet;
if ((newRet = streamAppendQueueItem(data, qItem)) == NULL) {
streamQueueProcessFail(pTask->inputQueue); streamQueueProcessFail(pTask->inputQueue);
break; break;
} else { } else {
cnt++; cnt++;
data = newRet;
/*streamUpdateVer(pTask, (SStreamDataBlock*)qItem);*/ /*streamUpdateVer(pTask, (SStreamDataBlock*)qItem);*/
streamQueueProcessSuccess(pTask->inputQueue); streamQueueProcessSuccess(pTask->inputQueue);
taosArrayDestroy(((SStreamDataBlock*)qItem)->blocks);
taosFreeQitem(qItem);
} }
} }
} }
if (pTask->taskStatus == TASK_STATUS__DROPPING) { if (pTask->taskStatus == TASK_STATUS__DROPPING) {
if (data) streamFreeQitem(data); if (data) streamFreeQitem(data);
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
@ -194,6 +197,7 @@ static SArray* streamExecForQall(SStreamTask* pTask, SArray* pRes) {
if (taosArrayGetSize(pRes) != 0) { if (taosArrayGetSize(pRes) != 0) {
SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM); SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM);
if (qRes == NULL) { if (qRes == NULL) {
// TODO log failed ver
streamQueueProcessFail(pTask->inputQueue); streamQueueProcessFail(pTask->inputQueue);
taosArrayDestroy(pRes); taosArrayDestroy(pRes);
return NULL; return NULL;
@ -201,6 +205,7 @@ static SArray* streamExecForQall(SStreamTask* pTask, SArray* pRes) {
qRes->type = STREAM_INPUT__DATA_BLOCK; qRes->type = STREAM_INPUT__DATA_BLOCK;
qRes->blocks = pRes; qRes->blocks = pRes;
if (streamTaskOutput(pTask, qRes) < 0) { if (streamTaskOutput(pTask, qRes) < 0) {
// TODO log failed ver
/*streamQueueProcessFail(pTask->inputQueue);*/ /*streamQueueProcessFail(pTask->inputQueue);*/
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
taosFreeQitem(qRes); taosFreeQitem(qRes);

View File

@ -417,7 +417,7 @@ int32_t walReadVer(SWalReader *pRead, int64_t ver) {
} }
if (ver > pRead->pWal->vers.lastVer || ver < pRead->pWal->vers.firstVer) { if (ver > pRead->pWal->vers.lastVer || ver < pRead->pWal->vers.firstVer) {
wError("vgId:%d, invalid index:%" PRId64 ", first index:%" PRId64 ", last index:%" PRId64, pRead->pWal->cfg.vgId, wDebug("vgId:%d, invalid index:%" PRId64 ", first index:%" PRId64 ", last index:%" PRId64, pRead->pWal->cfg.vgId,
ver, pRead->pWal->vers.firstVer, pRead->pWal->vers.lastVer); ver, pRead->pWal->vers.firstVer, pRead->pWal->vers.lastVer);
terrno = TSDB_CODE_WAL_LOG_NOT_EXIST; terrno = TSDB_CODE_WAL_LOG_NOT_EXIST;
return -1; return -1;
@ -425,7 +425,7 @@ int32_t walReadVer(SWalReader *pRead, int64_t ver) {
if (pRead->curInvalid || pRead->curVersion != ver) { if (pRead->curInvalid || pRead->curVersion != ver) {
if (walReadSeekVer(pRead, ver) < 0) { if (walReadSeekVer(pRead, ver) < 0) {
wError("vgId:%d, unexpected wal log index:%" PRId64 ", since %s", pRead->pWal->cfg.vgId, ver, terrstr()); wError("vgId:%d, unexpected wal log, index:%" PRId64 ", since %s", pRead->pWal->cfg.vgId, ver, terrstr());
return -1; return -1;
} }
seeked = true; seeked = true;
@ -452,7 +452,8 @@ int32_t walReadVer(SWalReader *pRead, int64_t ver) {
contLen = walValidHeadCksum(pRead->pHead); contLen = walValidHeadCksum(pRead->pHead);
if (contLen != 0) { if (contLen != 0) {
wError("vgId:%d, unexpected wal log index:%" PRId64 ", since head checksum not passed", pRead->pWal->cfg.vgId, ver); wError("vgId:%d, unexpected wal log, index:%" PRId64 ", since head checksum not passed", pRead->pWal->cfg.vgId,
ver);
terrno = TSDB_CODE_WAL_FILE_CORRUPTED; terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
return -1; return -1;
} }
@ -479,7 +480,7 @@ int32_t walReadVer(SWalReader *pRead, int64_t ver) {
} }
if (pRead->pHead->head.version != ver) { if (pRead->pHead->head.version != ver) {
wError("vgId:%d, unexpected wal log index:%" PRId64 ", read request index:%" PRId64, pRead->pWal->cfg.vgId, wError("vgId:%d, unexpected wal log, index:%" PRId64 ", read request index:%" PRId64, pRead->pWal->cfg.vgId,
pRead->pHead->head.version, ver); pRead->pHead->head.version, ver);
pRead->curInvalid = 1; pRead->curInvalid = 1;
terrno = TSDB_CODE_WAL_FILE_CORRUPTED; terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
@ -489,7 +490,8 @@ int32_t walReadVer(SWalReader *pRead, int64_t ver) {
contLen = walValidBodyCksum(pRead->pHead); contLen = walValidBodyCksum(pRead->pHead);
if (contLen != 0) { if (contLen != 0) {
wError("vgId:%d, unexpected wal log index:%" PRId64 ", since body checksum not passed", pRead->pWal->cfg.vgId, ver); wError("vgId:%d, unexpected wal log, index:%" PRId64 ", since body checksum not passed", pRead->pWal->cfg.vgId,
ver);
pRead->curInvalid = 1; pRead->curInvalid = 1;
terrno = TSDB_CODE_WAL_FILE_CORRUPTED; terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
ASSERT(0); ASSERT(0);

View File

@ -173,6 +173,46 @@ void taosArrayRemoveDuplicate(SArray* pArray, __compar_fn_t comparFn, void (*fp)
pArray->size = pos + 1; pArray->size = pos + 1;
} }
void taosArrayRemoveDuplicateP(SArray* pArray, __compar_fn_t comparFn, void (*fp)(void*)) {
assert(pArray);
size_t size = pArray->size;
if (size <= 1) {
return;
}
int32_t pos = 0;
for (int32_t i = 1; i < size; ++i) {
char* p1 = taosArrayGet(pArray, pos);
char* p2 = taosArrayGet(pArray, i);
if (comparFn(p1, p2) == 0) {
// do nothing
} else {
if (pos + 1 != i) {
void* p = taosArrayGet(pArray, pos + 1);
if (fp != NULL) {
fp(p);
}
taosArraySet(pArray, pos + 1, p2);
pos += 1;
} else {
pos += 1;
}
}
}
if (fp != NULL) {
for (int32_t i = pos + 1; i < pArray->size; ++i) {
void* p = taosArrayGetP(pArray, i);
fp(p);
}
}
pArray->size = pos + 1;
}
void* taosArrayAddAll(SArray* pArray, const SArray* pInput) { void* taosArrayAddAll(SArray* pArray, const SArray* pInput) {
if (pInput) { if (pInput) {
return taosArrayAddBatch(pArray, pInput->pData, (int32_t)taosArrayGetSize(pInput)); return taosArrayAddBatch(pArray, pInput->pData, (int32_t)taosArrayGetSize(pInput));

View File

@ -88,7 +88,7 @@ class TDTestCase:
tmqCom.startTmqSimProcess(self.pollDelay,self.paraDict["dbName"],self.showMsg, self.showRow,self.cdbName) tmqCom.startTmqSimProcess(self.pollDelay,self.paraDict["dbName"],self.showMsg, self.showRow,self.cdbName)
tdLog.info("After waiting for a period of time, drop one stable") tdLog.info("After waiting for a period of time, drop one stable")
time.sleep(10) time.sleep(3)
tdSql.execute("drop table %s.%s" %(self.paraDict['dbName'], self.paraDict['stbName'])) tdSql.execute("drop table %s.%s" %(self.paraDict['dbName'], self.paraDict['stbName']))
tdLog.info("wait result from consumer, then check it") tdLog.info("wait result from consumer, then check it")