refactor(sma): remove option
This commit is contained in:
parent
00acf4520c
commit
5e1bbe0e7e
|
@ -64,8 +64,7 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers);
|
||||||
* @param SReadHandle
|
* @param SReadHandle
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers, int32_t* numOfCols,
|
qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers, int32_t* numOfCols, SSchemaWrapper** pSchema);
|
||||||
SSchemaWrapper** pSchema);
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Set the input data block for the stream scan.
|
* Set the input data block for the stream scan.
|
||||||
|
@ -74,7 +73,7 @@ qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers, int32_t* n
|
||||||
* @param type
|
* @param type
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
int32_t qSetStreamInput(qTaskInfo_t tinfo, const void* input, int32_t type, bool assignUid);
|
int32_t qSetStreamInput(qTaskInfo_t tinfo, const void* input, int32_t type);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Set multiple input data blocks for the stream scan.
|
* Set multiple input data blocks for the stream scan.
|
||||||
|
@ -84,7 +83,7 @@ int32_t qSetStreamInput(qTaskInfo_t tinfo, const void* input, int32_t type, bool
|
||||||
* @param type
|
* @param type
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks, int32_t type, bool assignUid);
|
int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks, int32_t type);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Update the table id list, add or remove.
|
* Update the table id list, add or remove.
|
||||||
|
|
|
@ -611,8 +611,8 @@ static int32_t tdRSmaFetchAndSubmitResult(SRSmaInfoItem *pItem, STSchema *pTSche
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
smaDebug("vgId:%d, process submit req for rsma table %" PRIi64 " level %" PRIi8 " version:%"PRIi64, SMA_VID(pSma),
|
smaDebug("vgId:%d, process submit req for rsma table %" PRIi64 " level %" PRIi8 " version:%" PRIi64,
|
||||||
suid, pItem->level, output->info.version);
|
SMA_VID(pSma), suid, pItem->level, output->info.version);
|
||||||
|
|
||||||
taosMemoryFreeClear(pReq);
|
taosMemoryFreeClear(pReq);
|
||||||
taosArrayClear(pResult);
|
taosArrayClear(pResult);
|
||||||
|
@ -644,7 +644,7 @@ static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t inputType
|
||||||
smaDebug("vgId:%d, execute rsma %" PRIi8 " task for qTaskInfo:%p suid:%" PRIu64, SMA_VID(pSma), level,
|
smaDebug("vgId:%d, execute rsma %" PRIi8 " task for qTaskInfo:%p suid:%" PRIu64, SMA_VID(pSma), level,
|
||||||
pItem->taskInfo, suid);
|
pItem->taskInfo, suid);
|
||||||
|
|
||||||
if (qSetStreamInput(pItem->taskInfo, pMsg, inputType, true) < 0) { // INPUT__DATA_SUBMIT
|
if (qSetStreamInput(pItem->taskInfo, pMsg, inputType) < 0) { // INPUT__DATA_SUBMIT
|
||||||
smaError("vgId:%d, rsma % " PRIi8 " qSetStreamInput failed since %s", SMA_VID(pSma), level, tstrerror(terrno));
|
smaError("vgId:%d, rsma % " PRIi8 " qSetStreamInput failed since %s", SMA_VID(pSma), level, tstrerror(terrno));
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
}
|
}
|
||||||
|
@ -1329,7 +1329,7 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) {
|
||||||
tdRefRSmaInfo(pSma, pRSmaInfo);
|
tdRefRSmaInfo(pSma, pRSmaInfo);
|
||||||
|
|
||||||
SSDataBlock dataBlock = {.info.type = STREAM_GET_ALL};
|
SSDataBlock dataBlock = {.info.type = STREAM_GET_ALL};
|
||||||
qSetStreamInput(pItem->taskInfo, &dataBlock, STREAM_INPUT__DATA_BLOCK, false);
|
qSetStreamInput(pItem->taskInfo, &dataBlock, STREAM_INPUT__DATA_BLOCK);
|
||||||
tdRSmaFetchAndSubmitResult(pItem, pRSmaInfo->pTSchema, pRSmaInfo->suid, pStat, STREAM_INPUT__DATA_BLOCK);
|
tdRSmaFetchAndSubmitResult(pItem, pRSmaInfo->pTSchema, pRSmaInfo->suid, pStat, STREAM_INPUT__DATA_BLOCK);
|
||||||
|
|
||||||
tdUnRefRSmaInfo(pSma, pRSmaInfo);
|
tdUnRefRSmaInfo(pSma, pRSmaInfo);
|
||||||
|
@ -1356,4 +1356,4 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) {
|
||||||
|
|
||||||
_end:
|
_end:
|
||||||
tdReleaseSmaRef(smaMgmt.rsetId, pItem->refId, __func__, __LINE__);
|
tdReleaseSmaRef(smaMgmt.rsetId, pItem->refId, __func__, __LINE__);
|
||||||
}
|
}
|
||||||
|
|
|
@ -30,8 +30,7 @@ static void cleanupRefPool() {
|
||||||
taosCloseRef(ref);
|
taosCloseRef(ref);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t numOfBlocks, int32_t type, bool assignUid,
|
static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t numOfBlocks, int32_t type, char* id) {
|
||||||
char* id) {
|
|
||||||
ASSERT(pOperator != NULL);
|
ASSERT(pOperator != NULL);
|
||||||
if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
|
if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
|
||||||
if (pOperator->numOfDownstream == 0) {
|
if (pOperator->numOfDownstream == 0) {
|
||||||
|
@ -44,12 +43,12 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
|
||||||
return TSDB_CODE_QRY_APP_ERROR;
|
return TSDB_CODE_QRY_APP_ERROR;
|
||||||
}
|
}
|
||||||
pOperator->status = OP_NOT_OPENED;
|
pOperator->status = OP_NOT_OPENED;
|
||||||
return doSetStreamBlock(pOperator->pDownstream[0], input, numOfBlocks, type, assignUid, id);
|
return doSetStreamBlock(pOperator->pDownstream[0], input, numOfBlocks, type, id);
|
||||||
} else {
|
} else {
|
||||||
pOperator->status = OP_NOT_OPENED;
|
pOperator->status = OP_NOT_OPENED;
|
||||||
|
|
||||||
SStreamScanInfo* pInfo = pOperator->info;
|
SStreamScanInfo* pInfo = pOperator->info;
|
||||||
pInfo->assignBlockUid = assignUid;
|
/*pInfo->assignBlockUid = assignUid;*/
|
||||||
|
|
||||||
// TODO: if a block was set but not consumed,
|
// TODO: if a block was set but not consumed,
|
||||||
// prevent setting a different type of block
|
// prevent setting a different type of block
|
||||||
|
@ -95,11 +94,11 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t qSetStreamInput(qTaskInfo_t tinfo, const void* input, int32_t type, bool assignUid) {
|
int32_t qSetStreamInput(qTaskInfo_t tinfo, const void* input, int32_t type) {
|
||||||
return qSetMultiStreamInput(tinfo, input, 1, type, assignUid);
|
return qSetMultiStreamInput(tinfo, input, 1, type);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks, int32_t type, bool assignUid) {
|
int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks, int32_t type) {
|
||||||
if (tinfo == NULL) {
|
if (tinfo == NULL) {
|
||||||
return TSDB_CODE_QRY_APP_ERROR;
|
return TSDB_CODE_QRY_APP_ERROR;
|
||||||
}
|
}
|
||||||
|
@ -110,8 +109,7 @@ int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numO
|
||||||
|
|
||||||
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
|
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
|
||||||
|
|
||||||
int32_t code =
|
int32_t code = doSetStreamBlock(pTaskInfo->pRoot, (void**)pBlocks, numOfBlocks, type, GET_TASKID(pTaskInfo));
|
||||||
doSetStreamBlock(pTaskInfo->pRoot, (void**)pBlocks, numOfBlocks, type, assignUid, GET_TASKID(pTaskInfo));
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
qError("%s failed to set the stream block data", GET_TASKID(pTaskInfo));
|
qError("%s failed to set the stream block data", GET_TASKID(pTaskInfo));
|
||||||
} else {
|
} else {
|
||||||
|
@ -337,7 +335,7 @@ int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId,
|
||||||
}
|
}
|
||||||
|
|
||||||
code = dsCreateDataSinker(pSubplan->pDataSink, handle, pSinkParam);
|
code = dsCreateDataSinker(pSubplan->pDataSink, handle, pSinkParam);
|
||||||
if(code != TSDB_CODE_SUCCESS){
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
taosMemoryFreeClear(pSinkParam);
|
taosMemoryFreeClear(pSinkParam);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -22,22 +22,22 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, void* data, SArray* pRes)
|
||||||
SStreamQueueItem* pItem = (SStreamQueueItem*)data;
|
SStreamQueueItem* pItem = (SStreamQueueItem*)data;
|
||||||
if (pItem->type == STREAM_INPUT__GET_RES) {
|
if (pItem->type == STREAM_INPUT__GET_RES) {
|
||||||
SStreamTrigger* pTrigger = (SStreamTrigger*)data;
|
SStreamTrigger* pTrigger = (SStreamTrigger*)data;
|
||||||
qSetMultiStreamInput(exec, pTrigger->pBlock, 1, STREAM_INPUT__DATA_BLOCK, false);
|
qSetMultiStreamInput(exec, pTrigger->pBlock, 1, STREAM_INPUT__DATA_BLOCK);
|
||||||
} else if (pItem->type == STREAM_INPUT__DATA_SUBMIT) {
|
} else if (pItem->type == STREAM_INPUT__DATA_SUBMIT) {
|
||||||
ASSERT(pTask->isDataScan);
|
ASSERT(pTask->isDataScan);
|
||||||
SStreamDataSubmit* pSubmit = (SStreamDataSubmit*)data;
|
SStreamDataSubmit* pSubmit = (SStreamDataSubmit*)data;
|
||||||
qDebug("task %d %p set submit input %p %p %d 1", pTask->taskId, pTask, pSubmit, pSubmit->data, *pSubmit->dataRef);
|
qDebug("task %d %p set submit input %p %p %d 1", pTask->taskId, pTask, pSubmit, pSubmit->data, *pSubmit->dataRef);
|
||||||
qSetStreamInput(exec, pSubmit->data, STREAM_INPUT__DATA_SUBMIT, false);
|
qSetStreamInput(exec, pSubmit->data, STREAM_INPUT__DATA_SUBMIT);
|
||||||
} else if (pItem->type == STREAM_INPUT__DATA_BLOCK || pItem->type == STREAM_INPUT__DATA_RETRIEVE) {
|
} else if (pItem->type == STREAM_INPUT__DATA_BLOCK || pItem->type == STREAM_INPUT__DATA_RETRIEVE) {
|
||||||
SStreamDataBlock* pBlock = (SStreamDataBlock*)data;
|
SStreamDataBlock* pBlock = (SStreamDataBlock*)data;
|
||||||
SArray* blocks = pBlock->blocks;
|
SArray* blocks = pBlock->blocks;
|
||||||
qDebug("task %d %p set ssdata input", pTask->taskId, pTask);
|
qDebug("task %d %p set ssdata input", pTask->taskId, pTask);
|
||||||
qSetMultiStreamInput(exec, blocks->pData, blocks->size, STREAM_INPUT__DATA_BLOCK, false);
|
qSetMultiStreamInput(exec, blocks->pData, blocks->size, STREAM_INPUT__DATA_BLOCK);
|
||||||
} else if (pItem->type == STREAM_INPUT__MERGED_SUBMIT) {
|
} else if (pItem->type == STREAM_INPUT__MERGED_SUBMIT) {
|
||||||
SStreamMergedSubmit* pMerged = (SStreamMergedSubmit*)data;
|
SStreamMergedSubmit* pMerged = (SStreamMergedSubmit*)data;
|
||||||
SArray* blocks = pMerged->reqs;
|
SArray* blocks = pMerged->reqs;
|
||||||
qDebug("task %d %p set submit input (merged), batch num: %d", pTask->taskId, pTask, (int32_t)blocks->size);
|
qDebug("task %d %p set submit input (merged), batch num: %d", pTask->taskId, pTask, (int32_t)blocks->size);
|
||||||
qSetMultiStreamInput(exec, blocks->pData, blocks->size, STREAM_INPUT__MERGED_SUBMIT, false);
|
qSetMultiStreamInput(exec, blocks->pData, blocks->size, STREAM_INPUT__MERGED_SUBMIT);
|
||||||
} else {
|
} else {
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue