Merge pull request #17701 from taosdata/feature/stream
fix(query): set block capacity
This commit is contained in:
commit
52d8f7bd35
|
@ -89,6 +89,16 @@ qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers, int32_t* n
|
||||||
*/
|
*/
|
||||||
int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks, int32_t type);
|
int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks, int32_t type);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set block for sma
|
||||||
|
* @param tinfo
|
||||||
|
* @param pBlocks
|
||||||
|
* @param numOfInputBlock
|
||||||
|
* @param type
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
int32_t qSetSMAInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks, int32_t type);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Update the table id list, add or remove.
|
* Update the table id list, add or remove.
|
||||||
*
|
*
|
||||||
|
|
|
@ -15,11 +15,11 @@
|
||||||
|
|
||||||
#include "sma.h"
|
#include "sma.h"
|
||||||
|
|
||||||
#define RSMA_QTASKEXEC_SMOOTH_SIZE (100) // cnt
|
#define RSMA_QTASKEXEC_SMOOTH_SIZE (100) // cnt
|
||||||
#define RSMA_SUBMIT_BATCH_SIZE (1024) // cnt
|
#define RSMA_SUBMIT_BATCH_SIZE (1024) // cnt
|
||||||
#define RSMA_FETCH_DELAY_MAX (120000) // ms
|
#define RSMA_FETCH_DELAY_MAX (120000) // ms
|
||||||
#define RSMA_FETCH_ACTIVE_MAX (1000) // ms
|
#define RSMA_FETCH_ACTIVE_MAX (1000) // ms
|
||||||
#define RSMA_FETCH_INTERVAL (5000) // ms
|
#define RSMA_FETCH_INTERVAL (5000) // ms
|
||||||
|
|
||||||
SSmaMgmt smaMgmt = {
|
SSmaMgmt smaMgmt = {
|
||||||
.inited = 0,
|
.inited = 0,
|
||||||
|
@ -839,7 +839,7 @@ static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t msgSize,
|
||||||
tdRsmaPrintSubmitReq(pSma, pReq);
|
tdRsmaPrintSubmitReq(pSma, pReq);
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
if (qSetMultiStreamInput(qTaskInfo, pMsg, msgSize, inputType) < 0) {
|
if (qSetSMAInput(qTaskInfo, pMsg, msgSize, inputType) < 0) {
|
||||||
smaError("vgId:%d, rsma %" PRIi8 " qSetStreamInput failed since %s", SMA_VID(pSma), level, tstrerror(terrno));
|
smaError("vgId:%d, rsma %" PRIi8 " qSetStreamInput failed since %s", SMA_VID(pSma), level, tstrerror(terrno));
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
}
|
}
|
||||||
|
@ -1404,7 +1404,7 @@ static int32_t tdRSmaFetchAllResult(SSma *pSma, SRSmaInfo *pInfo) {
|
||||||
|
|
||||||
pItem->nScanned = 0;
|
pItem->nScanned = 0;
|
||||||
|
|
||||||
if ((terrno = qSetMultiStreamInput(taskInfo, &dataBlock, 1, STREAM_INPUT__DATA_BLOCK)) < 0) {
|
if ((terrno = qSetSMAInput(taskInfo, &dataBlock, 1, STREAM_INPUT__DATA_BLOCK)) < 0) {
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
if (tdRSmaExecAndSubmitResult(pSma, taskInfo, pItem, pInfo->pTSchema, pInfo->suid) < 0) {
|
if (tdRSmaExecAndSubmitResult(pSma, taskInfo, pItem, pInfo->pTSchema, pInfo->suid) < 0) {
|
||||||
|
|
|
@ -30,6 +30,46 @@ static void cleanupRefPool() {
|
||||||
taosCloseRef(ref);
|
taosCloseRef(ref);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t doSetSMABlock(SOperatorInfo* pOperator, void* input, size_t numOfBlocks, int32_t type, char* id) {
|
||||||
|
ASSERT(pOperator != NULL);
|
||||||
|
if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
|
||||||
|
if (pOperator->numOfDownstream == 0) {
|
||||||
|
qError("failed to find stream scan operator to set the input data block, %s" PRIx64, id);
|
||||||
|
return TSDB_CODE_QRY_APP_ERROR;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pOperator->numOfDownstream > 1) { // not handle this in join query
|
||||||
|
qError("join not supported for stream block scan, %s" PRIx64, id);
|
||||||
|
return TSDB_CODE_QRY_APP_ERROR;
|
||||||
|
}
|
||||||
|
pOperator->status = OP_NOT_OPENED;
|
||||||
|
return doSetSMABlock(pOperator->pDownstream[0], input, numOfBlocks, type, id);
|
||||||
|
} else {
|
||||||
|
pOperator->status = OP_NOT_OPENED;
|
||||||
|
|
||||||
|
SStreamScanInfo* pInfo = pOperator->info;
|
||||||
|
|
||||||
|
if (type == STREAM_INPUT__MERGED_SUBMIT) {
|
||||||
|
for (int32_t i = 0; i < numOfBlocks; i++) {
|
||||||
|
SSubmitReq* pReq = *(void**)POINTER_SHIFT(input, i * sizeof(void*));
|
||||||
|
taosArrayPush(pInfo->pBlockLists, &pReq);
|
||||||
|
}
|
||||||
|
pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
|
||||||
|
} else if (type == STREAM_INPUT__DATA_SUBMIT) {
|
||||||
|
taosArrayPush(pInfo->pBlockLists, &input);
|
||||||
|
pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
|
||||||
|
} else if (type == STREAM_INPUT__DATA_BLOCK) {
|
||||||
|
for (int32_t i = 0; i < numOfBlocks; ++i) {
|
||||||
|
SSDataBlock* pDataBlock = &((SSDataBlock*)input)[i];
|
||||||
|
taosArrayPush(pInfo->pBlockLists, &pDataBlock);
|
||||||
|
}
|
||||||
|
pInfo->blockType = STREAM_INPUT__DATA_BLOCK;
|
||||||
|
}
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t numOfBlocks, int32_t type, char* id) {
|
static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t numOfBlocks, int32_t type, char* id) {
|
||||||
ASSERT(pOperator != NULL);
|
ASSERT(pOperator != NULL);
|
||||||
if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
|
if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
|
||||||
|
@ -100,6 +140,27 @@ int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numO
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t qSetSMAInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks, int32_t type) {
|
||||||
|
if (tinfo == NULL) {
|
||||||
|
return TSDB_CODE_QRY_APP_ERROR;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pBlocks == NULL || numOfBlocks == 0) {
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
|
||||||
|
|
||||||
|
int32_t code = doSetSMABlock(pTaskInfo->pRoot, (void*)pBlocks, numOfBlocks, type, GET_TASKID(pTaskInfo));
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
qError("%s failed to set the sma block data", GET_TASKID(pTaskInfo));
|
||||||
|
} else {
|
||||||
|
qDebug("%s set the sma block successfully", GET_TASKID(pTaskInfo));
|
||||||
|
}
|
||||||
|
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers, int32_t* numOfCols, SSchemaWrapper** pSchema) {
|
qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers, int32_t* numOfCols, SSchemaWrapper** pSchema) {
|
||||||
if (msg == NULL) {
|
if (msg == NULL) {
|
||||||
// create raw scan
|
// create raw scan
|
||||||
|
|
|
@ -1738,8 +1738,10 @@ static int32_t filterDelBlockByUid(SSDataBlock* pDst, const SSDataBlock* pSrc, S
|
||||||
j++;
|
j++;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
uint32_t cap = pDst->info.capacity;
|
||||||
pDst->info = pSrc->info;
|
pDst->info = pSrc->info;
|
||||||
pDst->info.rows = j;
|
pDst->info.rows = j;
|
||||||
|
pDst->info.capacity = cap;
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue