Merge pull request #16981 from taosdata/feature/stream
enh(stream): blocklist do not copy
This commit is contained in:
commit
84d09a5e43
|
@ -58,7 +58,7 @@ int32_t tsNumOfMnodeFetchThreads = 1;
|
|||
int32_t tsNumOfMnodeReadThreads = 1;
|
||||
int32_t tsNumOfVnodeQueryThreads = 4;
|
||||
int32_t tsNumOfVnodeStreamThreads = 2;
|
||||
int32_t tsNumOfVnodeFetchThreads = 4;
|
||||
int32_t tsNumOfVnodeFetchThreads = 1;
|
||||
int32_t tsNumOfVnodeWriteThreads = 2;
|
||||
int32_t tsNumOfVnodeSyncThreads = 2;
|
||||
int32_t tsNumOfVnodeRsmaThreads = 2;
|
||||
|
@ -165,8 +165,8 @@ int32_t tsMqRebalanceInterval = 2;
|
|||
int32_t tsTtlUnit = 86400;
|
||||
int32_t tsTtlPushInterval = 86400;
|
||||
int32_t tsGrantHBInterval = 60;
|
||||
int32_t tsUptimeInterval = 300; // seconds
|
||||
char tsUdfdResFuncs[1024] = ""; // udfd resident funcs that teardown when udfd exits
|
||||
int32_t tsUptimeInterval = 300; // seconds
|
||||
char tsUdfdResFuncs[1024] = ""; // udfd resident funcs that teardown when udfd exits
|
||||
|
||||
#ifndef _STORAGE
|
||||
int32_t taosSetTfsCfg(SConfig *pCfg) {
|
||||
|
@ -371,9 +371,8 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
|
|||
tsNumOfVnodeStreamThreads = TMAX(tsNumOfVnodeStreamThreads, 4);
|
||||
if (cfgAddInt32(pCfg, "numOfVnodeStreamThreads", tsNumOfVnodeStreamThreads, 4, 1024, 0) != 0) return -1;
|
||||
|
||||
tsNumOfVnodeFetchThreads = tsNumOfCores / 4;
|
||||
tsNumOfVnodeFetchThreads = TMAX(tsNumOfVnodeFetchThreads, 4);
|
||||
if (cfgAddInt32(pCfg, "numOfVnodeFetchThreads", tsNumOfVnodeFetchThreads, 4, 1024, 0) != 0) return -1;
|
||||
tsNumOfVnodeFetchThreads = 1;
|
||||
if (cfgAddInt32(pCfg, "numOfVnodeFetchThreads", tsNumOfVnodeFetchThreads, 1, 1024, 0) != 0) return -1;
|
||||
|
||||
tsNumOfVnodeWriteThreads = tsNumOfCores;
|
||||
tsNumOfVnodeWriteThreads = TMAX(tsNumOfVnodeWriteThreads, 1);
|
||||
|
|
|
@ -479,6 +479,9 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
|
|||
tqInitDataRsp(&dataRsp, pReq, pHandle->execHandle.subType);
|
||||
tqScanData(pTq, pHandle, &dataRsp, &fetchOffsetNew);
|
||||
|
||||
#if 1
|
||||
|
||||
#endif
|
||||
if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) {
|
||||
code = -1;
|
||||
}
|
||||
|
|
|
@ -49,6 +49,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
|
|||
|
||||
SStreamScanInfo* pInfo = pOperator->info;
|
||||
|
||||
#if 0
|
||||
// TODO: if a block was set but not consumed,
|
||||
// prevent setting a different type of block
|
||||
pInfo->validBlockIndex = 0;
|
||||
|
@ -57,6 +58,10 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
|
|||
} else {
|
||||
taosArrayClear(pInfo->pBlockLists);
|
||||
}
|
||||
#endif
|
||||
|
||||
ASSERT(pInfo->validBlockIndex == 0);
|
||||
ASSERT(taosArrayGetSize(pInfo->pBlockLists) == 0);
|
||||
|
||||
if (type == STREAM_INPUT__MERGED_SUBMIT) {
|
||||
// ASSERT(numOfBlocks > 1);
|
||||
|
@ -79,7 +84,9 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
|
|||
} else if (type == STREAM_INPUT__DATA_BLOCK) {
|
||||
for (int32_t i = 0; i < numOfBlocks; ++i) {
|
||||
SSDataBlock* pDataBlock = &((SSDataBlock*)input)[i];
|
||||
taosArrayPush(pInfo->pBlockLists, &pDataBlock);
|
||||
|
||||
#if 0
|
||||
// TODO optimize
|
||||
SSDataBlock* p = createOneDataBlock(pDataBlock, false);
|
||||
p->info = pDataBlock->info;
|
||||
|
@ -87,6 +94,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
|
|||
taosArrayClear(p->pDataBlock);
|
||||
taosArrayAddAll(p->pDataBlock, pDataBlock->pDataBlock);
|
||||
taosArrayPush(pInfo->pBlockLists, &p);
|
||||
#endif
|
||||
}
|
||||
pInfo->blockType = STREAM_INPUT__DATA_BLOCK;
|
||||
} else {
|
||||
|
@ -100,6 +108,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
|
|||
static FORCE_INLINE void streamInputBlockDataDestory(void* pBlock) { blockDataDestroy((SSDataBlock*)pBlock); }
|
||||
|
||||
void tdCleanupStreamInputDataBlock(qTaskInfo_t tinfo) {
|
||||
#if 0
|
||||
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
|
||||
if (!pTaskInfo || !pTaskInfo->pRoot || pTaskInfo->pRoot->numOfDownstream <= 0) {
|
||||
return;
|
||||
|
@ -116,6 +125,7 @@ void tdCleanupStreamInputDataBlock(qTaskInfo_t tinfo) {
|
|||
} else {
|
||||
ASSERT(0);
|
||||
}
|
||||
#endif
|
||||
}
|
||||
|
||||
int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks, int32_t type) {
|
||||
|
|
|
@ -920,6 +920,19 @@ _error:
|
|||
}
|
||||
|
||||
static void doClearBufferedBlocks(SStreamScanInfo* pInfo) {
|
||||
#if 0
|
||||
if (pInfo->blockType == STREAM_INPUT__DATA_BLOCK) {
|
||||
size_t total = taosArrayGetSize(pInfo->pBlockLists);
|
||||
for (int32_t i = 0; i < total; i++) {
|
||||
SSDataBlock* p = taosArrayGetP(pInfo->pBlockLists, i);
|
||||
taosArrayDestroy(p->pDataBlock);
|
||||
taosMemoryFree(p);
|
||||
}
|
||||
}
|
||||
#endif
|
||||
taosArrayClear(pInfo->pBlockLists);
|
||||
pInfo->validBlockIndex = 0;
|
||||
#if 0
|
||||
size_t total = taosArrayGetSize(pInfo->pBlockLists);
|
||||
|
||||
pInfo->validBlockIndex = 0;
|
||||
|
@ -928,6 +941,7 @@ static void doClearBufferedBlocks(SStreamScanInfo* pInfo) {
|
|||
blockDataDestroy(p);
|
||||
}
|
||||
taosArrayClear(pInfo->pBlockLists);
|
||||
#endif
|
||||
}
|
||||
|
||||
static bool isSessionWindow(SStreamScanInfo* pInfo) {
|
||||
|
@ -1580,9 +1594,10 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
|
|||
|
||||
size_t total = taosArrayGetSize(pInfo->pBlockLists);
|
||||
// TODO: refactor
|
||||
FETCH_NEXT_BLOCK:
|
||||
if (pInfo->blockType == STREAM_INPUT__DATA_BLOCK) {
|
||||
if (pInfo->validBlockIndex >= total) {
|
||||
/*doClearBufferedBlocks(pInfo);*/
|
||||
doClearBufferedBlocks(pInfo);
|
||||
/*pOperator->status = OP_EXEC_DONE;*/
|
||||
return NULL;
|
||||
}
|
||||
|
@ -1606,27 +1621,40 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
|
|||
} break;
|
||||
case STREAM_DELETE_DATA: {
|
||||
printDataBlock(pBlock, "stream scan delete recv");
|
||||
SSDataBlock* pDelBlock = NULL;
|
||||
if (pInfo->tqReader) {
|
||||
SSDataBlock* pDelBlock = createSpecialDataBlock(STREAM_DELETE_DATA);
|
||||
pDelBlock = createSpecialDataBlock(STREAM_DELETE_DATA);
|
||||
filterDelBlockByUid(pDelBlock, pBlock, pInfo);
|
||||
pBlock = pDelBlock;
|
||||
} else {
|
||||
pDelBlock = pBlock;
|
||||
}
|
||||
printDataBlock(pBlock, "stream scan delete recv filtered");
|
||||
if (!isIntervalWindow(pInfo) && !isSessionWindow(pInfo) && !isStateWindow(pInfo)) {
|
||||
generateDeleteResultBlock(pInfo, pBlock, pInfo->pDeleteDataRes);
|
||||
generateDeleteResultBlock(pInfo, pDelBlock, pInfo->pDeleteDataRes);
|
||||
pInfo->pDeleteDataRes->info.type = STREAM_DELETE_RESULT;
|
||||
printDataBlock(pBlock, "stream scan delete result");
|
||||
return pInfo->pDeleteDataRes;
|
||||
printDataBlock(pDelBlock, "stream scan delete result");
|
||||
if (pInfo->pDeleteDataRes->info.rows > 0) {
|
||||
return pInfo->pDeleteDataRes;
|
||||
} else {
|
||||
goto FETCH_NEXT_BLOCK;
|
||||
}
|
||||
} else {
|
||||
pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
|
||||
pInfo->updateResIndex = 0;
|
||||
generateScanRange(pInfo, pBlock, pInfo->pUpdateRes);
|
||||
generateScanRange(pInfo, pDelBlock, pInfo->pUpdateRes);
|
||||
prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
|
||||
copyDataBlock(pInfo->pDeleteDataRes, pInfo->pUpdateRes);
|
||||
pInfo->pDeleteDataRes->info.type = STREAM_DELETE_DATA;
|
||||
pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
|
||||
printDataBlock(pBlock, "stream scan delete data");
|
||||
return pInfo->pDeleteDataRes;
|
||||
printDataBlock(pDelBlock, "stream scan delete data");
|
||||
if (pInfo->tqReader) {
|
||||
blockDataDestroy(pDelBlock);
|
||||
}
|
||||
if (pInfo->pDeleteDataRes->info.rows > 0) {
|
||||
return pInfo->pDeleteDataRes;
|
||||
} else {
|
||||
goto FETCH_NEXT_BLOCK;
|
||||
}
|
||||
}
|
||||
} break;
|
||||
default:
|
||||
|
@ -1688,10 +1716,12 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
|
|||
|
||||
int32_t totBlockNum = taosArrayGetSize(pInfo->pBlockLists);
|
||||
|
||||
NEXT_SUBMIT_BLK:
|
||||
while (1) {
|
||||
if (pInfo->tqReader->pMsg == NULL) {
|
||||
if (pInfo->validBlockIndex >= totBlockNum) {
|
||||
updateInfoDestoryColseWinSBF(pInfo->pUpdateInfo);
|
||||
doClearBufferedBlocks(pInfo);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
@ -1763,7 +1793,12 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
|
|||
}
|
||||
|
||||
qDebug("scan rows: %d", pBlockInfo->rows);
|
||||
return (pBlockInfo->rows == 0) ? NULL : pInfo->pRes;
|
||||
if (pBlockInfo->rows > 0) {
|
||||
return pInfo->pRes;
|
||||
} else {
|
||||
goto NEXT_SUBMIT_BLK;
|
||||
}
|
||||
/*return (pBlockInfo->rows == 0) ? NULL : pInfo->pRes;*/
|
||||
} else {
|
||||
ASSERT(0);
|
||||
return NULL;
|
||||
|
|
|
@ -5658,7 +5658,6 @@ static void doStreamIntervalAggImpl2(SOperatorInfo* pOperatorInfo, SSDataBlock*
|
|||
TSKEY* tsCols = NULL;
|
||||
SResultRow* pResult = NULL;
|
||||
int32_t forwardRows = 0;
|
||||
int32_t aa = 4;
|
||||
|
||||
ASSERT(pSDataBlock->pDataBlock != NULL);
|
||||
SColumnInfoData* pColDataInfo = taosArrayGet(pSDataBlock->pDataBlock, pInfo->primaryTsIndex);
|
||||
|
|
Loading…
Reference in New Issue