refactor: do some internal refactor.
This commit is contained in:
parent
6cef3baa95
commit
24090f0fc3
|
@ -538,7 +538,7 @@ static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCt
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t doAggregateImpl(SOperatorInfo* pOperator, TSKEY startTs, SqlFunctionCtx* pCtx) {
|
||||
static int32_t doAggregateImpl(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx) {
|
||||
for (int32_t k = 0; k < pOperator->exprSupp.numOfExprs; ++k) {
|
||||
if (functionNeedToExecute(&pCtx[k])) {
|
||||
// todo add a dummy funtion to avoid process check
|
||||
|
@ -2969,25 +2969,10 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
|
|||
// the pDataBlock are always the same one, no need to call this again
|
||||
setExecutionContext(pOperator, pOperator->exprSupp.numOfExprs, pBlock->info.groupId, pAggInfo);
|
||||
setInputDataBlock(pOperator, pSup->pCtx, pBlock, order, scanFlag, true);
|
||||
code = doAggregateImpl(pOperator, 0, pSup->pCtx);
|
||||
code = doAggregateImpl(pOperator, pSup->pCtx);
|
||||
if (code != 0) {
|
||||
longjmp(pTaskInfo->env, code);
|
||||
}
|
||||
|
||||
#if 0 // test for encode/decode result info
|
||||
if(pOperator->fpSet.encodeResultRow){
|
||||
char *result = NULL;
|
||||
int32_t length = 0;
|
||||
pOperator->fpSet.encodeResultRow(pOperator, &result, &length);
|
||||
SAggSupporter* pSup = &pAggInfo->aggSup;
|
||||
taosHashClear(pSup->pResultRowHashTable);
|
||||
pInfo->resultRowInfo.size = 0;
|
||||
pOperator->fpSet.decodeResultRow(pOperator, result);
|
||||
if(result){
|
||||
taosMemoryFree(result);
|
||||
}
|
||||
}
|
||||
#endif
|
||||
}
|
||||
|
||||
closeAllResultRows(&pAggInfo->binfo.resultRowInfo);
|
||||
|
|
|
@ -2860,101 +2860,3 @@ _error:
|
|||
return NULL;
|
||||
}
|
||||
|
||||
static SSDataBlock* doScanLastrow(SOperatorInfo* pOperator) {
|
||||
if (pOperator->status == OP_EXEC_DONE) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
SLastrowScanInfo* pInfo = pOperator->info;
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
|
||||
int32_t size = taosArrayGetSize(pInfo->pTableList);
|
||||
if (size == 0) {
|
||||
setTaskStatus(pTaskInfo, TASK_COMPLETED);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
// check if it is a group by tbname
|
||||
if (size == taosArrayGetSize(pInfo->pTableList)) {
|
||||
blockDataCleanup(pInfo->pRes);
|
||||
tsdbRetrieveLastRow(pInfo->pLastrowReader, pInfo->pRes, pInfo->pSlotIds);
|
||||
return (pInfo->pRes->info.rows == 0) ? NULL : pInfo->pRes;
|
||||
} else {
|
||||
// todo fetch the result for each group
|
||||
}
|
||||
|
||||
return pInfo->pRes->info.rows == 0 ? NULL : pInfo->pRes;
|
||||
}
|
||||
|
||||
static void destroyLastrowScanOperator(void* param, int32_t numOfOutput) {
|
||||
SLastrowScanInfo* pInfo = (SLastrowScanInfo*)param;
|
||||
blockDataDestroy(pInfo->pRes);
|
||||
tsdbLastrowReaderClose(pInfo->pLastrowReader);
|
||||
|
||||
taosMemoryFreeClear(param);
|
||||
}
|
||||
|
||||
SOperatorInfo* createLastrowScanOperator(SLastRowScanPhysiNode* pScanNode, SReadHandle* readHandle, SArray* pTableList,
|
||||
SExecTaskInfo* pTaskInfo) {
|
||||
SLastrowScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SLastrowScanInfo));
|
||||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||
if (pInfo == NULL || pOperator == NULL) {
|
||||
goto _error;
|
||||
}
|
||||
|
||||
pInfo->pTableList = pTableList;
|
||||
pInfo->readHandle = *readHandle;
|
||||
pInfo->pRes = createResDataBlock(pScanNode->node.pOutputDataBlockDesc);
|
||||
|
||||
int32_t numOfCols = 0;
|
||||
pInfo->pColMatchInfo = extractColMatchInfo(pScanNode->pScanCols, pScanNode->node.pOutputDataBlockDesc, &numOfCols,
|
||||
COL_MATCH_FROM_COL_ID);
|
||||
int32_t* pCols = taosMemoryMalloc(numOfCols * sizeof(int32_t));
|
||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||
SColMatchInfo* pColMatch = taosArrayGet(pInfo->pColMatchInfo, i);
|
||||
pCols[i] = pColMatch->colId;
|
||||
}
|
||||
|
||||
pInfo->pSlotIds = taosMemoryMalloc(numOfCols * sizeof(pInfo->pSlotIds[0]));
|
||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||
SColMatchInfo* pColMatch = taosArrayGet(pInfo->pColMatchInfo, i);
|
||||
for (int32_t j = 0; j < pTaskInfo->schemaVer.sw->nCols; ++j) {
|
||||
if (pColMatch->colId == pTaskInfo->schemaVer.sw->pSchema[j].colId &&
|
||||
pColMatch->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
|
||||
pInfo->pSlotIds[pColMatch->targetSlotId] = -1;
|
||||
break;
|
||||
}
|
||||
|
||||
if (pColMatch->colId == pTaskInfo->schemaVer.sw->pSchema[j].colId) {
|
||||
pInfo->pSlotIds[pColMatch->targetSlotId] = j;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
tsdbLastRowReaderOpen(readHandle->vnode, LASTROW_RETRIEVE_TYPE_ALL, pTableList, pCols, numOfCols,
|
||||
&pInfo->pLastrowReader);
|
||||
taosMemoryFree(pCols);
|
||||
|
||||
pOperator->name = "LastrowScanOperator";
|
||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN;
|
||||
pOperator->blocking = false;
|
||||
pOperator->status = OP_NOT_OPENED;
|
||||
pOperator->info = pInfo;
|
||||
pOperator->pTaskInfo = pTaskInfo;
|
||||
pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock);
|
||||
|
||||
initResultSizeInfo(pOperator, 1024);
|
||||
blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
|
||||
|
||||
pOperator->fpSet =
|
||||
createOperatorFpSet(operatorDummyOpenFn, doScanLastrow, NULL, NULL, destroyLastrowScanOperator, NULL, NULL, NULL);
|
||||
pOperator->cost.openCost = 0;
|
||||
return pOperator;
|
||||
|
||||
_error:
|
||||
pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
taosMemoryFree(pInfo);
|
||||
taosMemoryFree(pOperator);
|
||||
return NULL;
|
||||
}
|
||||
|
|
|
@ -2729,7 +2729,6 @@ int32_t firstFunction(SqlFunctionCtx* pCtx) {
|
|||
}
|
||||
}
|
||||
pInfo->hasResult = true;
|
||||
// DO_UPDATE_TAG_COLUMNS(pCtx, ts);
|
||||
pResInfo->numOfRes = 1;
|
||||
break;
|
||||
}
|
||||
|
@ -2826,7 +2825,6 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) {
|
|||
}
|
||||
pInfo->hasResult = true;
|
||||
pResInfo->numOfRes = 1;
|
||||
// DO_UPDATE_TAG_COLUMNS(pCtx, ts);
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
@ -5980,6 +5978,15 @@ int32_t lastrowFunction(SqlFunctionCtx* pCtx) {
|
|||
|
||||
pInfo->hasResult = true;
|
||||
pResInfo->numOfRes = 1;
|
||||
|
||||
if (pCtx->subsidiaries.num > 0) {
|
||||
STuplePos* pTuplePos = (STuplePos*)(pInfo->buf + bytes + sizeof(TSKEY));
|
||||
if (!pInfo->hasResult) {
|
||||
saveTupleData(pCtx, i, pCtx->pSrcBlock, pTuplePos);
|
||||
} else {
|
||||
copyTupleData(pCtx, i, pCtx->pSrcBlock, pTuplePos);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue