adj operator result

This commit is contained in:
54liuyao 2024-07-24 16:17:28 +08:00
parent a09ad7a81e
commit 5a34a1142f
16 changed files with 823 additions and 382 deletions

View File

@ -252,14 +252,15 @@ _end:
return pBlock != NULL;
}
SSDataBlock* getAggregateResult(SOperatorInfo* pOperator) {
int32_t getAggregateResultNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SAggOperatorInfo* pAggInfo = pOperator->info;
SOptrBasicInfo* pInfo = &pAggInfo->binfo;
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
(*ppRes) = NULL;
return code;
}
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
@ -291,10 +292,18 @@ SSDataBlock* getAggregateResult(SOperatorInfo* pOperator) {
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
pTaskInfo->code = code;
T_LONG_JMP(pTaskInfo->env, code);
}
return (rows == 0) ? NULL : pInfo->pRes;
(*ppRes) = (rows == 0) ? NULL : pInfo->pRes;
return code;
}
static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator) {
SSDataBlock* pRes = NULL;
int32_t code = getAggregateResultNext(pOperator, &pRes);
return pRes;
}
int32_t doAggregateImpl(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx) {

View File

@ -219,9 +219,12 @@ _error:
return NULL;
}
SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
int32_t doScanCacheNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
(*ppRes) = NULL;
return code;
}
SCacheRowsScanInfo* pInfo = pOperator->info;
@ -234,7 +237,8 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
int32_t size = tableListGetSize(pTableList);
if (size == 0) {
setOperatorCompleted(pOperator);
return NULL;
(*ppRes) = NULL;
return code;
}
blockDataCleanup(pInfo->pRes);
@ -249,11 +253,9 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
blockDataCleanup(pBufRes);
taosArrayClear(pInfo->pUidList);
int32_t code =
code =
pReaderFn->retrieveRows(pInfo->pLastrowReader, pBufRes, pInfo->pSlotIds, pInfo->pDstSlotIds, pInfo->pUidList);
if (code != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, code);
}
QUERY_CHECK_CODE(code, lino, _end);
// check for tag values
int32_t resultRows = pBufRes->info.rows;
@ -278,11 +280,8 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
} else {
if (pSrc->pData) {
char* p = colDataGetData(pSrc, pInfo->indexOfBufferedRes);
int32_t code = colDataSetVal(pDst, 0, p, false);
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
T_LONG_JMP(pTaskInfo->env, code);
}
code = colDataSetVal(pDst, 0, p, false);
QUERY_CHECK_CODE(code, lino, _end);
}
}
}
@ -292,19 +291,22 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
pRes->info.scanFlag = MAIN_SCAN;
SExprSupp* pSup = &pInfo->pseudoExprSup;
int32_t code = addTagPseudoColumnData(&pInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pRes,
pRes->info.rows, pTaskInfo, NULL);
code = addTagPseudoColumnData(&pInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pRes, pRes->info.rows,
pTaskInfo, NULL);
if (code != TSDB_CODE_SUCCESS) {
pTaskInfo->code = code;
return NULL;
(*ppRes) = NULL;
return code;
}
pRes->info.id.groupId = tableListGetTableGroupId(pTableList, pRes->info.id.uid);
pInfo->indexOfBufferedRes += 1;
return pRes;
(*ppRes) = pRes;
return code;
} else {
setOperatorCompleted(pOperator);
return NULL;
(*ppRes) = NULL;
return code;
}
} else {
size_t totalGroups = tableListGetOutputGroups(pTableList);
@ -317,37 +319,30 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
STableKeyInfo* pList = NULL;
int32_t num = 0;
int32_t code = tableListGetGroupList(pTableList, pInfo->currentGroupIndex, &pList, &num);
if (code != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, code);
}
code = tableListGetGroupList(pTableList, pInfo->currentGroupIndex, &pList, &num);
QUERY_CHECK_CODE(code, lino, _end);
if (NULL == pInfo->pLastrowReader) {
code = pReaderFn->openReader(pInfo->readHandle.vnode, pInfo->retrieveType, pList, num,
taosArrayGetSize(pInfo->matchInfo.pList), pInfo->pCidList, pInfo->pSlotIds, suid,
&pInfo->pLastrowReader, pTaskInfo->id.str, pInfo->pFuncTypeList, &pInfo->pkCol,
pInfo->numOfPks);
int32_t tmpRes = pReaderFn->openReader(pInfo->readHandle.vnode, pInfo->retrieveType, pList, num,
taosArrayGetSize(pInfo->matchInfo.pList), pInfo->pCidList,
pInfo->pSlotIds, suid, &pInfo->pLastrowReader, pTaskInfo->id.str,
pInfo->pFuncTypeList, &pInfo->pkCol, pInfo->numOfPks);
if (code != TSDB_CODE_SUCCESS) {
if (tmpRes != TSDB_CODE_SUCCESS) {
pInfo->currentGroupIndex += 1;
taosArrayClear(pInfo->pUidList);
continue;
}
} else {
code = pReaderFn->reuseReader(pInfo->pLastrowReader, pList, num);
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
T_LONG_JMP(pTaskInfo->env, code);
}
QUERY_CHECK_CODE(code, lino, _end);
}
taosArrayClear(pInfo->pUidList);
code = pReaderFn->retrieveRows(pInfo->pLastrowReader, pInfo->pRes, pInfo->pSlotIds, pInfo->pDstSlotIds,
pInfo->pUidList);
if (code != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, code);
}
QUERY_CHECK_CODE(code, lino, _end);
pInfo->currentGroupIndex += 1;
@ -365,13 +360,15 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
pInfo->pRes->info.rows, pTaskInfo, NULL);
if (code != TSDB_CODE_SUCCESS) {
pTaskInfo->code = code;
return NULL;
(*ppRes) = NULL;
return code;
}
}
}
// pInfo->pLastrowReader = tsdbCacherowsReaderClose(pInfo->pLastrowReader);
return pInfo->pRes;
(*ppRes) = pInfo->pRes;
return code;
} else {
// pInfo->pLastrowReader = tsdbCacherowsReaderClose(pInfo->pLastrowReader);
}
@ -380,8 +377,23 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
pReaderFn->closeReader(pInfo->pLastrowReader);
pInfo->pLastrowReader = NULL;
setOperatorCompleted(pOperator);
return NULL;
(*ppRes) = NULL;
return code;
}
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
pTaskInfo->code = code;
T_LONG_JMP(pTaskInfo->env, code);
}
return code;
}
static SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
SSDataBlock* pRes = NULL;
int32_t code = doScanCacheNext(pOperator, &pRes);
return pRes;
}
void destroyCacheScanOperator(void* param) {

View File

@ -171,7 +171,7 @@ _end:
}
}
static SSDataBlock* countWindowAggregate(SOperatorInfo* pOperator) {
static int32_t countWindowAggregateNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SCountWindowOperatorInfo* pInfo = pOperator->info;
@ -198,11 +198,9 @@ static SSDataBlock* countWindowAggregate(SOperatorInfo* pOperator) {
// there is an scalar expression that needs to be calculated right before apply the group aggregation.
if (pInfo->scalarSup.pExprInfo != NULL) {
pTaskInfo->code = projectApplyFunctions(pInfo->scalarSup.pExprInfo, pBlock, pBlock, pInfo->scalarSup.pCtx,
code = projectApplyFunctions(pInfo->scalarSup.pExprInfo, pBlock, pBlock, pInfo->scalarSup.pCtx,
pInfo->scalarSup.numOfExprs, NULL);
if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
}
QUERY_CHECK_CODE(code, lino, _end);
}
if (pInfo->groupId == 0) {
@ -214,7 +212,8 @@ static SSDataBlock* countWindowAggregate(SOperatorInfo* pOperator) {
doCountWindowAggImpl(pOperator, pBlock);
if (pRes->info.rows >= pOperator->resultInfo.threshold) {
return pRes;
(*ppRes) = pRes;
return code;
}
}
@ -223,9 +222,17 @@ static SSDataBlock* countWindowAggregate(SOperatorInfo* pOperator) {
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
pTaskInfo->code = code;
T_LONG_JMP(pTaskInfo->env, code);
}
return pRes->info.rows == 0 ? NULL : pRes;
(*ppRes) = pRes->info.rows == 0 ? NULL : pRes;
return code;
}
static SSDataBlock* countWindowAggregate(SOperatorInfo* pOperator) {
SSDataBlock* pRes = NULL;
int32_t code = countWindowAggregateNext(pOperator, &pRes);
return pRes;
}
SOperatorInfo* createCountwindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* physiNode,

View File

@ -168,7 +168,7 @@ void destroyEWindowOperatorInfo(void* param) {
taosMemoryFreeClear(param);
}
static SSDataBlock* eventWindowAggregate(SOperatorInfo* pOperator) {
static int32_t eventWindowAggregateNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SEventWindowOperatorInfo* pInfo = pOperator->info;
@ -197,11 +197,9 @@ static SSDataBlock* eventWindowAggregate(SOperatorInfo* pOperator) {
// there is an scalar expression that needs to be calculated right before apply the group aggregation.
if (pInfo->scalarSup.pExprInfo != NULL) {
pTaskInfo->code = projectApplyFunctions(pInfo->scalarSup.pExprInfo, pBlock, pBlock, pInfo->scalarSup.pCtx,
code = projectApplyFunctions(pInfo->scalarSup.pExprInfo, pBlock, pBlock, pInfo->scalarSup.pCtx,
pInfo->scalarSup.numOfExprs, NULL);
if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
}
QUERY_CHECK_CODE(code, lino, _end);
}
code = eventWindowAggImpl(pOperator, pInfo, pBlock);
@ -211,16 +209,25 @@ static SSDataBlock* eventWindowAggregate(SOperatorInfo* pOperator) {
QUERY_CHECK_CODE(code, lino, _end);
if (pRes->info.rows >= pOperator->resultInfo.threshold) {
return pRes;
(*ppRes) = pRes;
return code;
}
}
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
pTaskInfo->code = code;
T_LONG_JMP(pTaskInfo->env, code);
}
return pRes->info.rows == 0 ? NULL : pRes;
(*ppRes) = pRes->info.rows == 0 ? NULL : pRes;
return code;
}
static SSDataBlock* eventWindowAggregate(SOperatorInfo* pOperator) {
SSDataBlock* pRes = NULL;
int32_t code = eventWindowAggregateNext(pOperator, &pRes);
return pRes;
}
static int32_t setSingleOutputTupleBufv1(SResultRowInfo* pResultRowInfo, STimeWindow* win, SResultRow** pResult,

View File

@ -230,30 +230,30 @@ static SSDataBlock* doLoadRemoteDataImpl(SOperatorInfo* pOperator) {
}
}
static SSDataBlock* loadRemoteData(SOperatorInfo* pOperator) {
static int32_t loadRemoteDataNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SExchangeInfo* pExchangeInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
pTaskInfo->code = pOperator->fpSet._openFn(pOperator);
if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
}
code = pOperator->fpSet._openFn(pOperator);
QUERY_CHECK_CODE(code, lino, _end);
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
(*ppRes) = NULL;
return code;
}
while (1) {
SSDataBlock* pBlock = doLoadRemoteDataImpl(pOperator);
if (pBlock == NULL) {
return NULL;
(*ppRes) = NULL;
return code;
}
pTaskInfo->code = doFilter(pBlock, pOperator->exprSupp.pFilterInfo, NULL);
if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(pTaskInfo->code));
T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
}
code = doFilter(pBlock, pOperator->exprSupp.pFilterInfo, NULL);
QUERY_CHECK_CODE(code, lino, _end);
if (blockDataGetNumOfRows(pBlock) == 0) {
continue;
}
@ -266,15 +266,33 @@ static SSDataBlock* loadRemoteData(SOperatorInfo* pOperator) {
} else if (status == PROJECT_RETRIEVE_DONE) {
if (pBlock->info.rows == 0) {
setOperatorCompleted(pOperator);
return NULL;
(*ppRes) = NULL;
return code;
} else {
return pBlock;
(*ppRes) = pBlock;
return code;
}
}
} else {
return pBlock;
(*ppRes) = pBlock;
return code;
}
}
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
pTaskInfo->code = code;
T_LONG_JMP(pTaskInfo->env, code);
}
(*ppRes) = NULL;
return code;
}
static SSDataBlock* loadRemoteData(SOperatorInfo* pOperator) {
SSDataBlock* pRes = NULL;
int32_t code = loadRemoteDataNext(pOperator, &pRes);
return pRes;
}
static int32_t initDataSource(int32_t numOfSources, SExchangeInfo* pInfo, const char* id) {

View File

@ -316,12 +316,14 @@ _end:
return NULL;
}
static SSDataBlock* doFill(SOperatorInfo* pOperator) {
static int32_t doFillNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
int32_t code = TSDB_CODE_SUCCESS;
SFillOperatorInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
(*ppRes) = NULL;
return code;
}
SSDataBlock* fillResult = NULL;
@ -332,9 +334,10 @@ static SSDataBlock* doFill(SOperatorInfo* pOperator) {
break;
}
int32_t code = doFilter(fillResult, pOperator->exprSupp.pFilterInfo, &pInfo->matchInfo);
code = doFilter(fillResult, pOperator->exprSupp.pFilterInfo, &pInfo->matchInfo);
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
pTaskInfo->code = code;
T_LONG_JMP(pTaskInfo->env, code);
}
if (fillResult->info.rows > 0) {
@ -346,7 +349,14 @@ static SSDataBlock* doFill(SOperatorInfo* pOperator) {
pOperator->resultInfo.totalRows += fillResult->info.rows;
}
return fillResult;
(*ppRes) = fillResult;
return code;
}
static SSDataBlock* doFill(SOperatorInfo* pOperator) {
SSDataBlock* pRes = NULL;
int32_t code = doFillNext(pOperator, &pRes);
return pRes;
}
void destroyFillOperatorInfo(void* param) {

View File

@ -460,9 +460,10 @@ _end:
return (pRes->info.rows == 0) ? NULL : pRes;
}
static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) {
static int32_t hashGroupbyAggregateNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
(*ppRes) = NULL;
return TSDB_CODE_SUCCESS;
}
int32_t code = TSDB_CODE_SUCCESS;
@ -471,7 +472,8 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) {
SGroupbyOperatorInfo* pInfo = pOperator->info;
if (pOperator->status == OP_RES_TO_RETURN) {
return buildGroupResultDataBlockByHash(pOperator);
(*ppRes) = buildGroupResultDataBlockByHash(pOperator);
return code;
}
SGroupResInfo* pGroupResInfo = &pInfo->groupResInfo;
@ -523,7 +525,14 @@ _end:
pTaskInfo->code = code;
T_LONG_JMP(pTaskInfo->env, code);
}
return buildGroupResultDataBlockByHash(pOperator);
(*ppRes) = buildGroupResultDataBlockByHash(pOperator);
return code;
}
static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) {
SSDataBlock* pRes = NULL;
int32_t code = hashGroupbyAggregateNext(pOperator, &pRes);
return pRes;
}
SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pAggNode, SExecTaskInfo* pTaskInfo) {
@ -978,9 +987,10 @@ _end:
return pInfo->binfo.pRes;
}
static SSDataBlock* hashPartition(SOperatorInfo* pOperator) {
static int32_t hashPartitionNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
(*ppRes) = NULL;
return TSDB_CODE_SUCCESS;
}
int32_t code = TSDB_CODE_SUCCESS;
@ -990,7 +1000,8 @@ static SSDataBlock* hashPartition(SOperatorInfo* pOperator) {
SSDataBlock* pRes = pInfo->binfo.pRes;
if (pOperator->status == OP_RES_TO_RETURN) {
return buildPartitionResult(pOperator);
(*ppRes) = buildPartitionResult(pOperator);
return code;
}
int64_t st = taosGetTimestampUs();
@ -1005,21 +1016,21 @@ static SSDataBlock* hashPartition(SOperatorInfo* pOperator) {
pInfo->binfo.pRes->info.scanFlag = pBlock->info.scanFlag;
// there is an scalar expression that needs to be calculated right before apply the group aggregation.
if (pInfo->scalarSup.pExprInfo != NULL) {
pTaskInfo->code = projectApplyFunctions(pInfo->scalarSup.pExprInfo, pBlock, pBlock, pInfo->scalarSup.pCtx,
code = projectApplyFunctions(pInfo->scalarSup.pExprInfo, pBlock, pBlock, pInfo->scalarSup.pCtx,
pInfo->scalarSup.numOfExprs, NULL);
if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
}
QUERY_CHECK_CODE(code, lino, _end);
}
terrno = TSDB_CODE_SUCCESS;
doHashPartition(pOperator, pBlock);
if (terrno != TSDB_CODE_SUCCESS) { // group by json error
T_LONG_JMP(pTaskInfo->env, terrno);
code = terrno;
QUERY_CHECK_CODE(code, lino, _end);
}
}
SArray* groupArray = taosArrayInit(taosHashGetSize(pInfo->pGroupSet), sizeof(SDataGroupInfo));
QUERY_CHECK_NULL(groupArray, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
void* pGroupIter = taosHashIterate(pInfo->pGroupSet, NULL);
while (pGroupIter != NULL) {
@ -1043,10 +1054,18 @@ static SSDataBlock* hashPartition(SOperatorInfo* pOperator) {
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
pTaskInfo->code = code;
T_LONG_JMP(pTaskInfo->env, code);
}
return buildPartitionResult(pOperator);
(*ppRes) = buildPartitionResult(pOperator);
return code;
}
static SSDataBlock* hashPartition(SOperatorInfo* pOperator) {
SSDataBlock* pRes = NULL;
int32_t code = hashPartitionNext(pOperator, &pRes);
return pRes;
}
static void destroyPartitionOperatorInfo(void* param) {
@ -1413,26 +1432,29 @@ _end:
}
}
static SSDataBlock* doStreamHashPartition(SOperatorInfo* pOperator) {
static int32_t doStreamHashPartitionNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SStreamPartitionOperatorInfo* pInfo = pOperator->info;
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
(*ppRes) = NULL;
return code;
}
if (hasRemainTbName(pInfo)) {
code = buildStreamCreateTableResult(pOperator);
QUERY_CHECK_CODE(code, lino, _end);
if (pInfo->pCreateTbRes && pInfo->pCreateTbRes->info.rows > 0) {
return pInfo->pCreateTbRes;
(*ppRes) = pInfo->pCreateTbRes;
return code;
}
}
if (hasRemainPartion(pInfo)) {
return buildStreamPartitionResult(pOperator);
(*ppRes) = buildStreamPartitionResult(pOperator);
return code;
}
int64_t st = taosGetTimestampUs();
@ -1442,7 +1464,8 @@ static SSDataBlock* doStreamHashPartition(SOperatorInfo* pOperator) {
SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
if (pBlock == NULL) {
setOperatorCompleted(pOperator);
return NULL;
(*ppRes) = NULL;
return code;
}
printSpecDataBlock(pBlock, getStreamOpName(pOperator->operatorType), "recv", GET_TASKID(pTaskInfo));
switch (pBlock->info.type) {
@ -1457,13 +1480,15 @@ static SSDataBlock* doStreamHashPartition(SOperatorInfo* pOperator) {
pInfo->pDelRes->info.type = STREAM_DELETE_RESULT;
printDataBlock(pInfo->pDelRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
return pInfo->pDelRes;
(*ppRes) = pInfo->pDelRes;
return code;
} break;
case STREAM_CREATE_CHILD_TABLE:
case STREAM_RETRIEVE:
case STREAM_CHECKPOINT:
case STREAM_GET_ALL: {
return pBlock;
(*ppRes) = pBlock;
return code;
}
default:
ASSERTS(0, "invalid SSDataBlock type");
@ -1485,15 +1510,25 @@ static SSDataBlock* doStreamHashPartition(SOperatorInfo* pOperator) {
code = buildStreamCreateTableResult(pOperator);
QUERY_CHECK_CODE(code, lino, _end);
if (pInfo->pCreateTbRes && pInfo->pCreateTbRes->info.rows > 0) {
return pInfo->pCreateTbRes;
(*ppRes) = pInfo->pCreateTbRes;
return code;
}
return buildStreamPartitionResult(pOperator);
(*ppRes) = buildStreamPartitionResult(pOperator);
return code;
_end:
if (code != TSDB_CODE_SUCCESS) {
pTaskInfo->code = code;
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
return NULL;
(*ppRes) = NULL;
return code;
}
static SSDataBlock* doStreamHashPartition(SOperatorInfo* pOperator) {
SSDataBlock* pRes = NULL;
int32_t code = doStreamHashPartitionNext(pOperator, &pRes);
return pRes;
}
static void destroyStreamPartitionOperatorInfo(void* param) {

View File

@ -842,23 +842,24 @@ static SSDataBlock* getBlockForEmptyTable(SOperatorInfo* pOperator, const STable
return pBlock;
}
static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) {
static int32_t doTableScanImplNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
STableScanInfo* pTableScanInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SStorageAPI* pAPI = &pTaskInfo->storageAPI;
SSDataBlock* pBlock = pTableScanInfo->pResBlock;
bool hasNext = false;
int32_t code = TSDB_CODE_SUCCESS;
pBlock->info.dataLoad = false;
int64_t st = taosGetTimestampUs();
while (true) {
code = pAPI->tsdReader.tsdNextDataBlock(pTableScanInfo->base.dataReader, &hasNext);
if (code) {
if (code != TSDB_CODE_SUCCESS) {
pAPI->tsdReader.tsdReaderReleaseDataBlock(pTableScanInfo->base.dataReader);
T_LONG_JMP(pTaskInfo->env, code);
QUERY_CHECK_CODE(code, lino, _end);
}
if (!hasNext) {
@ -887,9 +888,7 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) {
uint32_t status = 0;
code = loadDataBlock(pOperator, &pTableScanInfo->base, pBlock, &status);
if (code != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, code);
}
QUERY_CHECK_CODE(code, lino, _end);
if (status == FUNC_DATA_REQUIRED_ALL_FILTEROUT) {
break;
@ -905,9 +904,24 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) {
pOperator->cost.totalCost = pTableScanInfo->base.readRecorder.elapsedTime;
pBlock->info.scanFlag = pTableScanInfo->base.scanFlag;
return pBlock;
(*ppRes) = pBlock;
return code;
}
return NULL;
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
pTaskInfo->code = code;
T_LONG_JMP(pTaskInfo->env, code);
}
(*ppRes) = NULL;
return code;
}
static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) {
SSDataBlock* pRes = NULL;
int32_t code = doTableScanImplNext(pOperator, &pRes);
return pRes;
}
static SSDataBlock* doGroupedTableScan(SOperatorInfo* pOperator) {
@ -1177,7 +1191,7 @@ _end:
return result;
}
static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
static int32_t doTableScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
STableScanInfo* pInfo = pOperator->info;
@ -1189,10 +1203,8 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
code = createTableListInfoFromParam(pOperator);
freeOperatorParam(pOperator->pOperatorGetParam, OP_GET_PARAM);
pOperator->pOperatorGetParam = NULL;
if (code != TSDB_CODE_SUCCESS) {
pTaskInfo->code = code;
T_LONG_JMP(pTaskInfo->env, code);
}
QUERY_CHECK_CODE(code, lino, _end);
if (pOperator->status == OP_EXEC_DONE) {
pInfo->currentGroupId = -1;
pOperator->status = OP_OPENED;
@ -1200,7 +1212,8 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
while (true) {
result = startNextGroupScan(pOperator);
if (result || pOperator->status == OP_EXEC_DONE) {
return result;
(*ppRes) = result;
return code;
}
}
}
@ -1215,7 +1228,8 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
while (1) {
SSDataBlock* result = doGroupedTableScan(pOperator);
if (result || (pOperator->status == OP_EXEC_DONE) || isTaskKilled(pTaskInfo)) {
return result;
(*ppRes) = result;
return code;
}
// if no data, switch to next table and continue scan
@ -1227,7 +1241,8 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
if (pInfo->currentTable >= numOfTables) {
qDebug("all table checked in table list, total:%d, return NULL, %s", numOfTables, GET_TASKID(pTaskInfo));
taosRUnLockLatch(&pTaskInfo->lock);
return NULL;
(*ppRes) = NULL;
return code;
}
tInfo = *(STableKeyInfo*)tableListGetInfo(pInfo->base.pTableListInfo, pInfo->currentTable);
@ -1243,7 +1258,8 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
pInfo->scanTimes = 0;
}
} else { // scan table group by group sequentially
return groupSeqTableScan(pOperator);
(*ppRes) = groupSeqTableScan(pOperator);
return code;
}
_end:
@ -1252,7 +1268,14 @@ _end:
pTaskInfo->code = code;
T_LONG_JMP(pTaskInfo->env, code);
}
return NULL;
(*ppRes) = NULL;
return code;
}
static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
SSDataBlock* pRes = NULL;
int32_t code = doTableScanNext(pOperator, &pRes);
return pRes;
}
static int32_t getTableScannerExecInfo(struct SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) {
@ -2648,7 +2671,7 @@ static void processPrimaryKey(SSDataBlock* pBlock, bool hasPrimaryKey, STqOffset
tqOffsetResetToData(offset, pBlock->info.id.uid, pBlock->info.window.ekey, val);
}
static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
static int32_t doQueueScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
@ -2660,7 +2683,8 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
qDebug("start to exec queue scan, %s", id);
if (isTaskKilled(pTaskInfo)) {
return NULL;
(*ppRes) = NULL;
return code;
}
if (pTaskInfo->streamInfo.currentOffset.type == TMQ_OFFSET__SNAPSHOT_DATA) {
@ -2672,7 +2696,8 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
processPrimaryKey(pResult, hasPrimaryKey, &pTaskInfo->streamInfo.currentOffset);
qDebug("tmqsnap doQueueScan get data uid:%" PRId64 "", pResult->info.id.uid);
if (pResult->info.rows > 0) {
return pResult;
(*ppRes) = pResult;
return code;
}
} else {
break;
@ -2686,7 +2711,8 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
int64_t validVer = pTaskInfo->streamInfo.snapshotVer + 1;
qDebug("queue scan tsdb over, switch to wal ver %" PRId64 "", validVer);
if (pAPI->tqReaderFn.tqReaderSeek(pInfo->tqReader, validVer, pTaskInfo->id.str) < 0) {
return NULL;
(*ppRes) = NULL;
return code;
}
tqOffsetResetToLog(&pTaskInfo->streamInfo.currentOffset, validVer);
@ -2716,23 +2742,34 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
qDebug("doQueueScan after filter get data from log %" PRId64 " rows, version:%" PRId64, pInfo->pRes->info.rows,
pTaskInfo->streamInfo.currentOffset.version);
if (pInfo->pRes->info.rows > 0) {
return pInfo->pRes;
(*ppRes) = pInfo->pRes;
return code;
}
} else {
qDebug("doQueueScan get none from log, return, version:%" PRId64, pTaskInfo->streamInfo.currentOffset.version);
return NULL;
(*ppRes) = NULL;
return code;
}
}
} else {
qError("unexpected streamInfo prepare type: %d", pTaskInfo->streamInfo.currentOffset.type);
return NULL;
(*ppRes) = NULL;
return code;
}
_end:
if (code != TSDB_CODE_SUCCESS) {
pTaskInfo->code = code;
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
return NULL;
(*ppRes) = NULL;
return code;
}
static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
SSDataBlock* pRes = NULL;
int32_t code = doQueueScanNext(pOperator, &pRes);
return pRes;
}
static int32_t filterDelBlockByUid(SSDataBlock* pDst, const SSDataBlock* pSrc, SStreamScanInfo* pInfo) {
@ -2923,7 +2960,7 @@ static bool isStreamWindow(SStreamScanInfo* pInfo) {
return isIntervalWindow(pInfo) || isSessionWindow(pInfo) || isStateWindow(pInfo) || isCountWindow(pInfo);
}
static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
static int32_t doStreamScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
// NOTE: this operator does never check if current status is done or not
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
@ -2973,7 +3010,8 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
if (pStreamInfo->recoverStep == STREAM_RECOVER_STEP__SCAN1) {
if (isTaskKilled(pTaskInfo)) {
qInfo("===stream===stream scan is killed. task id:%s, code %s", id, tstrerror(pTaskInfo->code));
return NULL;
(*ppRes) = NULL;
return code;
}
switch (pInfo->scanMode) {
@ -2981,7 +3019,8 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
printSpecDataBlock(pInfo->pRecoverRes, getStreamOpName(pOperator->operatorType), "recover",
GET_TASKID(pTaskInfo));
return pInfo->pRecoverRes;
(*ppRes) = pInfo->pRecoverRes;
return code;
} break;
default:
break;
@ -3002,13 +3041,15 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
pInfo->scanMode = STREAM_SCAN_FROM_RES;
printSpecDataBlock(pInfo->pCreateTbRes, getStreamOpName(pOperator->operatorType), "recover",
GET_TASKID(pTaskInfo));
return pInfo->pCreateTbRes;
(*ppRes) = pInfo->pCreateTbRes;
return code;
}
qDebug("stream recover scan get block, rows %" PRId64, pInfo->pRecoverRes->info.rows);
printSpecDataBlock(pInfo->pRecoverRes, getStreamOpName(pOperator->operatorType), "recover",
GET_TASKID(pTaskInfo));
return pInfo->pRecoverRes;
(*ppRes) = pInfo->pRecoverRes;
return code;
}
pStreamInfo->recoverStep = STREAM_RECOVER_STEP__NONE;
STableScanInfo* pTSInfo = pInfo->pTableScanOp->info;
@ -3020,7 +3061,8 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
pTSInfo->base.cond.endVersion = -1;
pStreamInfo->recoverScanFinished = true;
return NULL;
(*ppRes) = NULL;
return code;
}
size_t total = taosArrayGetSize(pInfo->pBlockLists);
@ -3029,7 +3071,8 @@ FETCH_NEXT_BLOCK:
if (pInfo->blockType == STREAM_INPUT__DATA_BLOCK) {
if (pInfo->validBlockIndex >= total) {
doClearBufferedBlocks(pInfo);
return NULL;
(*ppRes) = NULL;
return code;
}
int32_t current = pInfo->validBlockIndex++;
@ -3060,7 +3103,8 @@ FETCH_NEXT_BLOCK:
case STREAM_GET_ALL:
printDataBlock(pBlock, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
setStreamOperatorState(&pInfo->basic, pBlock->info.type);
return pBlock;
(*ppRes) = pBlock;
return code;
case STREAM_RETRIEVE: {
pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RETRIEVE;
@ -3107,7 +3151,8 @@ FETCH_NEXT_BLOCK:
printSpecDataBlock(pInfo->pDeleteDataRes, getStreamOpName(pOperator->operatorType), "delete result",
GET_TASKID(pTaskInfo));
setStreamOperatorState(&pInfo->basic, pInfo->pDeleteDataRes->info.type);
return pInfo->pDeleteDataRes;
(*ppRes) = pInfo->pDeleteDataRes;
return code;
} else {
goto FETCH_NEXT_BLOCK;
}
@ -3128,7 +3173,8 @@ FETCH_NEXT_BLOCK:
printSpecDataBlock(pInfo->pDeleteDataRes, getStreamOpName(pOperator->operatorType), "delete result",
GET_TASKID(pTaskInfo));
setStreamOperatorState(&pInfo->basic, pInfo->pDeleteDataRes->info.type);
return pInfo->pDeleteDataRes;
(*ppRes) = pInfo->pDeleteDataRes;
return code;
} else {
goto FETCH_NEXT_BLOCK;
}
@ -3142,7 +3188,8 @@ FETCH_NEXT_BLOCK:
}
printDataBlock(pBlock, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
setStreamOperatorState(&pInfo->basic, pBlock->info.type);
return pBlock;
(*ppRes) = pBlock;
return code;
} else if (pInfo->blockType == STREAM_INPUT__DATA_SUBMIT) {
qDebug("stream scan mode:%d, %s", pInfo->scanMode, id);
switch (pInfo->scanMode) {
@ -3158,7 +3205,8 @@ FETCH_NEXT_BLOCK:
QUERY_CHECK_CODE(code, lino, _end);
if (pInfo->pRes->info.rows > 0) {
printDataBlock(pInfo->pRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
return pInfo->pRes;
(*ppRes) = pInfo->pRes;
return code;
}
} break;
case STREAM_SCAN_FROM_DELETE_DATA: {
@ -3169,14 +3217,16 @@ FETCH_NEXT_BLOCK:
code = copyDataBlock(pInfo->pDeleteDataRes, pInfo->pUpdateRes);
QUERY_CHECK_CODE(code, lino, _end);
pInfo->pDeleteDataRes->info.type = STREAM_DELETE_DATA;
return pInfo->pDeleteDataRes;
(*ppRes) = pInfo->pDeleteDataRes;
return code;
} break;
case STREAM_SCAN_FROM_UPDATERES: {
code = generateScanRange(pInfo, pInfo->pUpdateDataRes, pInfo->pUpdateRes, STREAM_CLEAR);
QUERY_CHECK_CODE(code, lino, _end);
prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex, NULL);
pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
return pInfo->pUpdateRes;
(*ppRes) = pInfo->pUpdateRes;
return code;
} break;
case STREAM_SCAN_FROM_DATAREADER_RANGE:
case STREAM_SCAN_FROM_DATAREADER_RETRIEVE: {
@ -3193,7 +3243,8 @@ FETCH_NEXT_BLOCK:
printSpecDataBlock(pSDB, getStreamOpName(pOperator->operatorType), "update", GET_TASKID(pTaskInfo));
code = calBlockTbName(pInfo, pSDB, 0);
QUERY_CHECK_CODE(code, lino, _end);
return pSDB;
(*ppRes) = pSDB;
return code;
}
blockDataCleanup(pInfo->pUpdateDataRes);
pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
@ -3212,7 +3263,8 @@ FETCH_NEXT_BLOCK:
prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex, NULL);
pInfo->pUpdateRes->info.type = STREAM_DELETE_DATA;
printSpecDataBlock(pInfo->pUpdateRes, getStreamOpName(pOperator->operatorType), "rebuild", GET_TASKID(pTaskInfo));
return pInfo->pUpdateRes;
(*ppRes) = pInfo->pUpdateRes;
return code;
}
SDataBlockInfo* pBlockInfo = &pInfo->pRes->info;
@ -3226,7 +3278,8 @@ FETCH_NEXT_BLOCK:
doClearBufferedBlocks(pInfo);
qDebug("stream scan return empty, all %d submit blocks consumed, %s", totalBlocks, id);
return NULL;
(*ppRes) = NULL;
return code;
}
int32_t current = pInfo->validBlockIndex++;
@ -3264,7 +3317,8 @@ FETCH_NEXT_BLOCK:
pInfo->scanMode = STREAM_SCAN_FROM_RES;
qDebug("create table res exists, rows:%" PRId64 " return from stream scan, %s",
pInfo->pCreateTbRes->info.rows, id);
return pInfo->pCreateTbRes;
(*ppRes) = pInfo->pCreateTbRes;
return code;
}
code = doCheckUpdate(pInfo, pBlockInfo->window.ekey, pInfo->pRes);
@ -3297,7 +3351,8 @@ FETCH_NEXT_BLOCK:
qDebug("stream scan completed, and return source rows:%" PRId64 ", %s", pBlockInfo->rows, id);
if (pBlockInfo->rows > 0) {
printDataBlock(pInfo->pRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
return pInfo->pRes;
(*ppRes) = pInfo->pRes;
return code;
}
if (pInfo->pUpdateDataRes->info.rows > 0) {
@ -3308,7 +3363,8 @@ FETCH_NEXT_BLOCK:
} else if (pInfo->blockType == STREAM_INPUT__CHECKPOINT) {
if (pInfo->validBlockIndex >= total) {
doClearBufferedBlocks(pInfo);
return NULL;
(*ppRes) = NULL;
return code;
}
int32_t current = pInfo->validBlockIndex++;
@ -3321,16 +3377,24 @@ FETCH_NEXT_BLOCK:
streamScanOperatorSaveCheckpoint(pInfo);
}
// printDataBlock(pInfo->pCheckpointRes, "stream scan ck", GET_TASKID(pTaskInfo));
return pInfo->pCheckpointRes;
(*ppRes) = pInfo->pCheckpointRes;
return code;
}
_end:
if (code != TSDB_CODE_SUCCESS) {
pTaskInfo->code = code;
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
return NULL;
(*ppRes) = NULL;
return code;
}
static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
SSDataBlock* pRes = NULL;
int32_t code = doStreamScanNext(pOperator, &pRes);
return pRes;
}
static int32_t extractTableIdList(const STableListInfo* pTableListInfo, SArray** ppArrayRes) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
@ -3353,7 +3417,7 @@ _end:
return code;
}
static SSDataBlock* doRawScan(SOperatorInfo* pOperator) {
static int32_t doRawScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
@ -3368,9 +3432,9 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) {
bool hasNext = false;
if (pInfo->dataReader && pInfo->sContext->withMeta != ONLY_META) {
code = pAPI->tsdReader.tsdNextDataBlock(pInfo->dataReader, &hasNext);
if (code) {
if (code != TSDB_CODE_SUCCESS) {
pAPI->tsdReader.tsdReaderReleaseDataBlock(pInfo->dataReader);
T_LONG_JMP(pTaskInfo->env, code);
QUERY_CHECK_CODE(code, lino, _end);
}
}
@ -3383,14 +3447,16 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) {
SSDataBlock* pBlock = NULL;
code = pAPI->tsdReader.tsdReaderRetrieveDataBlock(pInfo->dataReader, &pBlock, NULL);
if (pBlock == NULL || code != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, terrno);
code = terrno;
QUERY_CHECK_CODE(code, lino, _end);
}
if (pBlock && pBlock->info.rows > 0) {
bool hasPrimaryKey = pAPI->snapshotFn.taosXGetTablePrimaryKey(pInfo->sContext);
processPrimaryKey(pBlock, hasPrimaryKey, &pTaskInfo->streamInfo.currentOffset);
qDebug("tmqsnap doRawScan get data uid:%" PRId64 "", pBlock->info.id.uid);
return pBlock;
(*ppRes) = pBlock;
return code;
}
}
@ -3407,7 +3473,8 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) {
code = qStreamPrepareScan(pTaskInfo, &offset, pInfo->sContext->subType);
QUERY_CHECK_CODE(code, lino, _end);
tDeleteSchemaWrapper(mtInfo.schema);
return NULL;
(*ppRes) = NULL;
return code;
} else if (pTaskInfo->streamInfo.currentOffset.type == TMQ_OFFSET__SNAPSHOT_META) {
SSnapContext* sContext = pInfo->sContext;
for (int32_t i = 0; i < tmqRowSize; i++) {
@ -3441,10 +3508,10 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) {
pTaskInfo->streamInfo.btMetaRsp.batchMetaLen = taosArrayInit(4, sizeof(int32_t));
QUERY_CHECK_NULL(pTaskInfo->streamInfo.btMetaRsp.batchMetaLen, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
}
int32_t code = TSDB_CODE_SUCCESS;
int32_t tempRes = TSDB_CODE_SUCCESS;
uint32_t len = 0;
tEncodeSize(tEncodeMqMetaRsp, &tmpMetaRsp, len, code);
if (TSDB_CODE_SUCCESS != code) {
tEncodeSize(tEncodeMqMetaRsp, &tmpMetaRsp, len, tempRes);
if (TSDB_CODE_SUCCESS != tempRes) {
qError("tmqsnap tEncodeMqMetaRsp error");
taosMemoryFreeClear(data);
break;
@ -3452,11 +3519,13 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) {
int32_t tLen = sizeof(SMqRspHead) + len;
void* tBuf = taosMemoryCalloc(1, tLen);
QUERY_CHECK_NULL(tBuf, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
void* metaBuff = POINTER_SHIFT(tBuf, sizeof(SMqRspHead));
SEncoder encoder = {0};
tEncoderInit(&encoder, metaBuff, len);
code = tEncodeMqMetaRsp(&encoder, &tmpMetaRsp);
if (code < 0) {
tempRes = tEncodeMqMetaRsp(&encoder, &tmpMetaRsp);
if (TSDB_CODE_SUCCESS != tempRes) {
qError("tmqsnap tEncodeMqMetaRsp error");
tEncoderClear(&encoder);
taosMemoryFreeClear(tBuf);
@ -3471,7 +3540,8 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) {
QUERY_CHECK_NULL(tmp, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
}
}
return NULL;
(*ppRes) = NULL;
return code;
}
_end:
@ -3480,7 +3550,14 @@ _end:
pTaskInfo->code = code;
T_LONG_JMP(pTaskInfo->env, code);
}
return NULL;
(*ppRes) = NULL;
return code;
}
static SSDataBlock* doRawScan(SOperatorInfo* pOperator) {
SSDataBlock* pRes = NULL;
int32_t code = doRawScanNext(pOperator, &pRes);
return pRes;
}
static void destroyRawScanOperatorInfo(void* param) {
@ -3570,6 +3647,7 @@ void streamScanReleaseState(SOperatorInfo* pOperator) {
return;
}
if (!pInfo->pUpdateInfo) {
qDebug("stask:%s streamScanReleaseState cancel", GET_TASKID(pOperator->pTaskInfo));
return;
}
int32_t len = 0;
@ -3602,6 +3680,10 @@ void streamScanReloadState(SOperatorInfo* pOperator) {
if (!pInfo->pState) {
return;
}
if (!pInfo->pUpdateInfo) {
qDebug("stask:%s streamScanReloadState cancel", GET_TASKID(pOperator->pTaskInfo));
return;
}
void* pBuff = NULL;
int32_t len = 0;
code = pInfo->stateStore.streamStateGetInfo(pInfo->pState, STREAM_SCAN_OP_STATE_NAME,
@ -4179,9 +4261,10 @@ _end:
return code;
}
static SSDataBlock* doTagScanFromCtbIdx(SOperatorInfo* pOperator) {
static int32_t doTagScanFromCtbIdxNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
(*ppRes) = NULL;
return TSDB_CODE_SUCCESS;
}
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
@ -4268,12 +4351,21 @@ _end:
T_LONG_JMP(pTaskInfo->env, code);
}
pOperator->resultInfo.totalRows += pRes->info.rows;
return (pRes->info.rows == 0) ? NULL : pInfo->pRes;
(*ppRes) = (pRes->info.rows == 0) ? NULL : pInfo->pRes;
return code;
}
static SSDataBlock* doTagScanFromMetaEntry(SOperatorInfo* pOperator) {
static SSDataBlock* doTagScanFromCtbIdx(SOperatorInfo* pOperator) {
SSDataBlock* pRes = NULL;
int32_t code = doTagScanFromCtbIdxNext(pOperator, &pRes);
return pRes;
}
static int32_t doTagScanFromMetaEntryNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
int32_t code = TSDB_CODE_SUCCESS;
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
(*ppRes) = NULL;
return code;
}
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
@ -4287,7 +4379,8 @@ static SSDataBlock* doTagScanFromMetaEntry(SOperatorInfo* pOperator) {
int32_t size = tableListGetSize(pInfo->pTableListInfo);
if (size == 0) {
setTaskStatus(pTaskInfo, TASK_COMPLETED);
return NULL;
(*ppRes) = NULL;
return code;
}
char str[512] = {0};
@ -4316,7 +4409,14 @@ static SSDataBlock* doTagScanFromMetaEntry(SOperatorInfo* pOperator) {
pOperator->resultInfo.totalRows += pRes->info.rows;
return (pRes->info.rows == 0) ? NULL : pInfo->pRes;
(*ppRes) = (pRes->info.rows == 0) ? NULL : pInfo->pRes;
return code;
}
static SSDataBlock* doTagScanFromMetaEntry(SOperatorInfo* pOperator) {
SSDataBlock* pRes = NULL;
int32_t code = doTagScanFromMetaEntryNext(pOperator, &pRes);
return pRes;
}
static void destroyTagScanOperatorInfo(void* param) {
@ -4849,9 +4949,10 @@ static void stopSubTablesTableMergeScan(STableMergeScanInfo* pInfo) {
}
}
SSDataBlock* doTableMergeScanParaSubTables(SOperatorInfo* pOperator) {
int32_t doTableMergeScanParaSubTablesNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
(*ppRes) = NULL;
return TSDB_CODE_SUCCESS;
}
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
@ -4871,7 +4972,8 @@ SSDataBlock* doTableMergeScanParaSubTables(SOperatorInfo* pOperator) {
if (tableListSize == 0) {
setOperatorCompleted(pOperator);
return NULL;
(*ppRes) = NULL;
return code;
}
pInfo->tableStartIndex = 0;
pInfo->groupId = ((STableKeyInfo*)tableListGetInfo(pInfo->base.pTableListInfo, pInfo->tableStartIndex))->groupId;
@ -4919,7 +5021,14 @@ _end:
pTaskInfo->code = code;
T_LONG_JMP(pTaskInfo->env, code);
}
return pBlock;
(*ppRes) = pBlock;
return code;
}
static SSDataBlock* doTableMergeScanParaSubTables(SOperatorInfo* pOperator) {
SSDataBlock* pRes = NULL;
int32_t code = doTableMergeScanParaSubTablesNext(pOperator, &pRes);
return pRes;
}
static void tableMergeScanDoSkipTable(uint64_t uid, void* pTableMergeOpInfo) {
@ -5312,18 +5421,19 @@ SSDataBlock* getSortedTableMergeScanBlockData(SSortHandle* pHandle, SSDataBlock*
return (pResBlock->info.rows > 0) ? pResBlock : NULL;
}
SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) {
int32_t doTableMergeScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
(*ppRes) = NULL;
return TSDB_CODE_SUCCESS;
}
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
STableMergeScanInfo* pInfo = pOperator->info;
int32_t code = pOperator->fpSet._openFn(pOperator);
if (code != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, code);
}
code = pOperator->fpSet._openFn(pOperator);
QUERY_CHECK_CODE(code, lino, _end);
int64_t st = taosGetTimestampUs();
@ -5333,7 +5443,8 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) {
if (tableListSize == 0) {
setOperatorCompleted(pOperator);
return NULL;
(*ppRes) = NULL;
return code;
}
pInfo->tableStartIndex = 0;
pInfo->groupId = ((STableKeyInfo*)tableListGetInfo(pInfo->base.pTableListInfo, pInfo->tableStartIndex))->groupId;
@ -5361,9 +5472,7 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) {
if (pInfo->bNewFilesetEvent) {
stopDurationForGroupTableMergeScan(pOperator);
code = startDurationForGroupTableMergeScan(pOperator);
if (code != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, terrno);
}
QUERY_CHECK_CODE(code, lino, _end);
} else {
// Data of this group are all dumped, let's try the next group
stopGroupTableMergeScan(pOperator);
@ -5382,7 +5491,20 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) {
pOperator->cost.totalCost += (taosGetTimestampUs() - st) / 1000.0;
return pBlock;
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
pTaskInfo->code = code;
T_LONG_JMP(pTaskInfo->env, code);
}
(*ppRes) = pBlock;
return code;
}
static SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) {
SSDataBlock* pRes = NULL;
int32_t code = doTableMergeScanNext(pOperator, &pRes);
return pRes;
}
void destroyTableMergeScanOperatorInfo(void* param) {
@ -5841,7 +5963,8 @@ _end:
}
}
static SSDataBlock* doTableCountScan(SOperatorInfo* pOperator) {
static int32_t doTableCountScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
int32_t code = TSDB_CODE_SUCCESS;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
STableCountScanOperatorInfo* pInfo = pOperator->info;
STableCountScanSupp* pSupp = &pInfo->supp;
@ -5849,13 +5972,22 @@ static SSDataBlock* doTableCountScan(SOperatorInfo* pOperator) {
blockDataCleanup(pRes);
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
(*ppRes) = NULL;
return code;
}
if (pInfo->readHandle.mnd != NULL) {
return buildSysDbTableCount(pOperator, pInfo);
(*ppRes) = buildSysDbTableCount(pOperator, pInfo);
return code;
}
return buildVnodeDbTableCount(pOperator, pInfo, pSupp, pRes);
(*ppRes) = buildVnodeDbTableCount(pOperator, pInfo, pSupp, pRes);
return code;
}
static SSDataBlock* doTableCountScan(SOperatorInfo* pOperator) {
SSDataBlock* pRes = NULL;
int32_t code = doTableCountScanNext(pOperator, &pRes);
return pRes;
}
static SSDataBlock* buildVnodeDbTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,

View File

@ -604,7 +604,7 @@ _end:
return code;
}
static SSDataBlock* doStreamCountAgg(SOperatorInfo* pOperator) {
static int32_t doStreamCountAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SExprSupp* pSup = &pOperator->exprSupp;
@ -614,11 +614,13 @@ static SSDataBlock* doStreamCountAgg(SOperatorInfo* pOperator) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
qDebug("stask:%s %s status: %d", GET_TASKID(pTaskInfo), getStreamOpName(pOperator->operatorType), pOperator->status);
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
(*ppRes) = NULL;
return code;
} else if (pOperator->status == OP_RES_TO_RETURN) {
SSDataBlock* opRes = buildCountResult(pOperator);
if (opRes) {
return opRes;
(*ppRes) = opRes;
return code;
}
if (pInfo->recvGetAll) {
@ -629,11 +631,13 @@ static SSDataBlock* doStreamCountAgg(SOperatorInfo* pOperator) {
if (pInfo->reCkBlock) {
pInfo->reCkBlock = false;
printDataBlock(pInfo->pCheckpointRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
return pInfo->pCheckpointRes;
(*ppRes) = pInfo->pCheckpointRes;
return code;
}
setStreamOperatorCompleted(pOperator);
return NULL;
(*ppRes) = NULL;
return code;
}
SOperatorInfo* downstream = pOperator->pDownstream[0];
@ -667,7 +671,8 @@ static SSDataBlock* doStreamCountAgg(SOperatorInfo* pOperator) {
QUERY_CHECK_CODE(code, lino, _end);
continue;
} else if (pBlock->info.type == STREAM_CREATE_CHILD_TABLE) {
return pBlock;
(*ppRes) = pBlock;
return code;
} else if (pBlock->info.type == STREAM_CHECKPOINT) {
pAggSup->stateStore.streamStateCommit(pAggSup->pState);
doStreamCountSaveCheckpoint(pOperator);
@ -712,15 +717,24 @@ static SSDataBlock* doStreamCountAgg(SOperatorInfo* pOperator) {
SSDataBlock* opRes = buildCountResult(pOperator);
if (opRes) {
return opRes;
(*ppRes) = opRes;
return code;
}
_end:
if (code != TSDB_CODE_SUCCESS) {
pTaskInfo->code = code;
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
setStreamOperatorCompleted(pOperator);
return NULL;
(*ppRes) = NULL;
return code;
}
static SSDataBlock* doStreamCountAgg(SOperatorInfo* pOperator) {
SSDataBlock* pRes = NULL;
int32_t code = doStreamCountAggNext(pOperator, &pRes);
return pRes;
}
void streamCountReleaseState(SOperatorInfo* pOperator) {
@ -729,10 +743,8 @@ void streamCountReleaseState(SOperatorInfo* pOperator) {
SStreamEventAggOperatorInfo* pInfo = pOperator->info;
int32_t resSize = sizeof(TSKEY);
char* pBuff = taosMemoryCalloc(1, resSize);
if (pBuff) {
code = terrno;
QUERY_CHECK_CODE(code, lino, _end);
}
QUERY_CHECK_NULL(pBuff, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
memcpy(pBuff, &pInfo->twAggSup.maxTs, sizeof(TSKEY));
qDebug("===stream=== count window operator relase state. ");
pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_COUNT_OP_STATE_NAME,
@ -745,6 +757,7 @@ void streamCountReleaseState(SOperatorInfo* pOperator) {
}
_end:
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
}
@ -773,6 +786,7 @@ void streamCountReloadState(SOperatorInfo* pOperator) {
_end:
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
}

View File

@ -382,8 +382,8 @@ static void doStreamEventAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl
&curWin.winInfo.sessionWin.win.ekey, &uid, &groupId, NULL);
QUERY_CHECK_CODE(code, lino, _end);
code = tSimpleHashRemove(pSeUpdated, &curWin.winInfo.sessionWin, sizeof(SSessionKey));
QUERY_CHECK_CODE(code, lino, _end);
int32_t tmpRes = tSimpleHashRemove(pSeUpdated, &curWin.winInfo.sessionWin, sizeof(SSessionKey));
qTrace("%s at line %d res:%d", __func__, __LINE__, tmpRes);
doDeleteEventWindow(pAggSup, pSeUpdated, &curWin.winInfo.sessionWin);
if (pInfo->destHasPrimaryKey && curWin.winInfo.isOutput && IS_NORMAL_EVENT_OP(pOperator) &&
@ -574,9 +574,10 @@ static SSDataBlock* buildEventResult(SOperatorInfo* pOperator) {
return NULL;
}
static SSDataBlock* doStreamEventAgg(SOperatorInfo* pOperator) {
static int32_t doStreamEventAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
(*ppRes) = NULL;
return TSDB_CODE_SUCCESS;
}
int32_t code = TSDB_CODE_SUCCESS;
@ -589,7 +590,8 @@ static SSDataBlock* doStreamEventAgg(SOperatorInfo* pOperator) {
if (pOperator->status == OP_RES_TO_RETURN) {
SSDataBlock* resBlock = buildEventResult(pOperator);
if (resBlock != NULL) {
return resBlock;
(*ppRes) = resBlock;
return code;
}
if (pInfo->recvGetAll) {
@ -600,11 +602,13 @@ static SSDataBlock* doStreamEventAgg(SOperatorInfo* pOperator) {
if (pInfo->reCkBlock) {
pInfo->reCkBlock = false;
printDataBlock(pInfo->pCheckpointRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
return pInfo->pCheckpointRes;
(*ppRes) = pInfo->pCheckpointRes;
return code;
}
setStreamOperatorCompleted(pOperator);
return NULL;
(*ppRes) = NULL;
return code;
}
SOperatorInfo* downstream = pOperator->pDownstream[0];
@ -636,7 +640,8 @@ static SSDataBlock* doStreamEventAgg(SOperatorInfo* pOperator) {
QUERY_CHECK_CODE(code, lino, _end);
continue;
} else if (pBlock->info.type == STREAM_CREATE_CHILD_TABLE) {
return pBlock;
(*ppRes) = pBlock;
return code;
} else if (pBlock->info.type == STREAM_CHECKPOINT) {
pInfo->streamAggSup.stateStore.streamStateCommit(pInfo->streamAggSup.pState);
doStreamEventSaveCheckpoint(pOperator);
@ -697,15 +702,24 @@ static SSDataBlock* doStreamEventAgg(SOperatorInfo* pOperator) {
SSDataBlock* resBlock = buildEventResult(pOperator);
if (resBlock != NULL) {
return resBlock;
(*ppRes) = resBlock;
return code;
}
_end:
if (code != TSDB_CODE_SUCCESS) {
pTaskInfo->code = code;
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
setStreamOperatorCompleted(pOperator);
return NULL;
(*ppRes) = NULL;
return code;
}
static SSDataBlock* doStreamEventAgg(SOperatorInfo* pOperator) {
SSDataBlock* pRes = NULL;
int32_t code = doStreamEventAggNext(pOperator, &pRes);
return pRes;
}
void streamEventReleaseState(SOperatorInfo* pOperator) {

View File

@ -563,8 +563,7 @@ static void doStreamFillLinear(SStreamFillSupporter* pFillSup, SStreamFillInfo*
code = checkResult(pFillSup, pFillInfo->current, groupId, &ckRes);
QUERY_CHECK_CODE(code, lino, _end);
if ((pFillSup->hasDelete && !ckRes) ||
!inWinRange(&pFillSup->winRange, &st)) {
if ((pFillSup->hasDelete && !ckRes) || !inWinRange(&pFillSup->winRange, &st)) {
pFillInfo->current = taosTimeAdd(pFillInfo->current, pFillSup->interval.sliding, pFillSup->interval.slidingUnit,
pFillSup->interval.precision);
pFillInfo->pLinearInfo->winIndex++;
@ -1013,14 +1012,15 @@ _end:
return code;
}
static SSDataBlock* doStreamFill(SOperatorInfo* pOperator) {
static int32_t doStreamFillNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SStreamFillOperatorInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
(*ppRes) = NULL;
return code;
}
blockDataCleanup(pInfo->pRes);
if (hasRemainCalc(pInfo->pFillInfo) ||
@ -1028,18 +1028,21 @@ static SSDataBlock* doStreamFill(SOperatorInfo* pOperator) {
doStreamFillRange(pInfo->pFillInfo, pInfo->pFillSup, pInfo->pRes);
if (pInfo->pRes->info.rows > 0) {
printDataBlock(pInfo->pRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
return pInfo->pRes;
(*ppRes) = pInfo->pRes;
return code;
}
}
if (pOperator->status == OP_RES_TO_RETURN) {
doDeleteFillFinalize(pOperator);
if (pInfo->pRes->info.rows > 0) {
printDataBlock(pInfo->pRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
return pInfo->pRes;
(*ppRes) = pInfo->pRes;
return code;
}
setOperatorCompleted(pOperator);
resetStreamFillInfo(pInfo);
return NULL;
(*ppRes) = NULL;
return code;
}
SSDataBlock* fillResult = NULL;
@ -1053,7 +1056,8 @@ static SSDataBlock* doStreamFill(SOperatorInfo* pOperator) {
pInfo->pFillInfo->preRowKey = INT64_MIN;
if (pInfo->pRes->info.rows > 0) {
printDataBlock(pInfo->pRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
return pInfo->pRes;
(*ppRes) = pInfo->pRes;
return code;
}
break;
}
@ -1071,7 +1075,8 @@ static SSDataBlock* doStreamFill(SOperatorInfo* pOperator) {
switch (pBlock->info.type) {
case STREAM_RETRIEVE:
return pBlock;
(*ppRes) = pBlock;
return code;
case STREAM_DELETE_RESULT: {
pInfo->pSrcDelBlock = pBlock;
pInfo->srcDelRowIndex = 0;
@ -1082,7 +1087,8 @@ static SSDataBlock* doStreamFill(SOperatorInfo* pOperator) {
if (pInfo->pDelRes->info.rows > 0) {
printDataBlock(pInfo->pDelRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
return pInfo->pDelRes;
(*ppRes) = pInfo->pDelRes;
return code;
}
continue;
} break;
@ -1097,7 +1103,8 @@ static SSDataBlock* doStreamFill(SOperatorInfo* pOperator) {
} break;
case STREAM_CHECKPOINT:
case STREAM_CREATE_CHILD_TABLE: {
return pBlock;
(*ppRes) = pBlock;
return code;
} break;
default:
ASSERTS(false, "invalid SSDataBlock type");
@ -1121,20 +1128,30 @@ static SSDataBlock* doStreamFill(SOperatorInfo* pOperator) {
if (pInfo->pRes->info.rows == 0) {
setOperatorCompleted(pOperator);
resetStreamFillInfo(pInfo);
return NULL;
(*ppRes) = NULL;
return code;
}
pOperator->resultInfo.totalRows += pInfo->pRes->info.rows;
printDataBlock(pInfo->pRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
return pInfo->pRes;
(*ppRes) = pInfo->pRes;
return code;
_end:
if (code != TSDB_CODE_SUCCESS) {
pTaskInfo->code = code;
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
setOperatorCompleted(pOperator);
resetStreamFillInfo(pInfo);
return NULL;
(*ppRes) = NULL;
return code;
}
static SSDataBlock* doStreamFill(SOperatorInfo* pOperator) {
SSDataBlock* pRes = NULL;
int32_t code = doStreamFillNext(pOperator, &pRes);
return pRes;
}
static int32_t initResultBuf(SStreamFillSupporter* pFillSup) {

View File

@ -1504,7 +1504,7 @@ _end:
return code;
}
static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
static int32_t doStreamFinalIntervalAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SStreamIntervalOperatorInfo* pInfo = pOperator->info;
@ -1517,11 +1517,13 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
qDebug("stask:%s %s status: %d", GET_TASKID(pTaskInfo), getStreamOpName(pOperator->operatorType), pOperator->status);
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
(*ppRes) = NULL;
return code;
} else if (pOperator->status == OP_RES_TO_RETURN) {
SSDataBlock* resBlock = buildIntervalResult(pOperator);
if (resBlock != NULL) {
return resBlock;
(*ppRes) = resBlock;
return code;
}
if (pInfo->recvGetAll) {
@ -1532,7 +1534,8 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
if (pInfo->reCkBlock) {
pInfo->reCkBlock = false;
printDataBlock(pInfo->pCheckpointRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
return pInfo->pCheckpointRes;
(*ppRes) = pInfo->pCheckpointRes;
return code;
}
setStreamOperatorCompleted(pOperator);
@ -1542,18 +1545,21 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
clearStreamIntervalOperator(pInfo);
qDebug("===stream===clear semi operator");
}
return NULL;
(*ppRes) = NULL;
return code;
} else {
if (!IS_FINAL_INTERVAL_OP(pOperator)) {
SSDataBlock* resBlock = buildIntervalResult(pOperator);
if (resBlock != NULL) {
return resBlock;
(*ppRes) = resBlock;
return code;
}
if (pInfo->recvRetrive) {
pInfo->recvRetrive = false;
printDataBlock(pInfo->pMidRetriveRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
return pInfo->pMidRetriveRes;
(*ppRes) = pInfo->pMidRetriveRes;
return code;
}
}
}
@ -1569,7 +1575,8 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
while (1) {
if (isTaskKilled(pTaskInfo)) {
qInfo("===stream=== %s task is killed, code %s", GET_TASKID(pTaskInfo), tstrerror(pTaskInfo->code));
return NULL;
(*ppRes) = NULL;
return code;
}
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
@ -1625,7 +1632,8 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
} else {
pInfo->pDelRes->info.type = STREAM_DELETE_RESULT;
}
return pInfo->pDelRes;
(*ppRes) = pInfo->pDelRes;
return code;
}
break;
@ -1652,7 +1660,8 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
QUERY_CHECK_CODE(code, lino, _end);
continue;
} else if (pBlock->info.type == STREAM_CREATE_CHILD_TABLE) {
return pBlock;
(*ppRes) = pBlock;
return code;
} else if (pBlock->info.type == STREAM_CHECKPOINT) {
pAPI->stateStore.streamStateCommit(pInfo->pState);
doStreamIntervalSaveCheckpoint(pOperator);
@ -1705,21 +1714,31 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
SSDataBlock* resBlock = buildIntervalResult(pOperator);
if (resBlock != NULL) {
return resBlock;
(*ppRes) = resBlock;
return code;
}
if (pInfo->recvRetrive) {
pInfo->recvRetrive = false;
printDataBlock(pInfo->pMidRetriveRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
return pInfo->pMidRetriveRes;
(*ppRes) = pInfo->pMidRetriveRes;
return code;
}
_end:
if (code != TSDB_CODE_SUCCESS) {
pTaskInfo->code = code;
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
setStreamOperatorCompleted(pOperator);
return NULL;
(*ppRes) = NULL;
return code;
}
static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
SSDataBlock* pRes = NULL;
int32_t code = doStreamFinalIntervalAggNext(pOperator, &pRes);
return pRes;
}
int64_t getDeleteMark(SWindowPhysiNode* pWinPhyNode, int64_t interval) {
@ -2205,15 +2224,11 @@ void removeSessionResult(SStreamAggSupporter* pAggSup, SSHashObj* pHashMap, SSHa
void* pVal = tSimpleHashGet(pHashMap, &key, sizeof(SSessionKey));
if (pVal) {
releaseOutputBuf(pAggSup->pState, *(void**)pVal, &pAggSup->pSessionAPI->stateStore);
code = tSimpleHashRemove(pHashMap, &key, sizeof(SSessionKey));
if (code != TSDB_CODE_SUCCESS) {
qWarn("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
}
}
code = tSimpleHashRemove(pResMap, &key, sizeof(SSessionKey));
if (code != TSDB_CODE_SUCCESS) {
qWarn("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
int32_t tmpRes = tSimpleHashRemove(pHashMap, &key, sizeof(SSessionKey));
qTrace("%s at line %d res:%d", __func__, __LINE__, tmpRes);
}
int32_t tmpRes = tSimpleHashRemove(pResMap, &key, sizeof(SSessionKey));
qTrace("%s at line %d res:%d", __func__, __LINE__, tmpRes);
}
void getSessionHashKey(const SSessionKey* pKey, SSessionKey* pHashKey) {
@ -2322,10 +2337,8 @@ void doDeleteSessionWindow(SStreamAggSupporter* pAggSup, SSessionKey* pKey) {
pAggSup->stateStore.streamStateSessionDel(pAggSup->pState, pKey);
SSessionKey hashKey = {0};
getSessionHashKey(pKey, &hashKey);
int32_t code = tSimpleHashRemove(pAggSup->pResultRows, &hashKey, sizeof(SSessionKey));
if (code != TSDB_CODE_SUCCESS) {
qTrace("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
}
int32_t tmpRes = tSimpleHashRemove(pAggSup->pResultRows, &hashKey, sizeof(SSessionKey));
qTrace("%s at line %d res:%d", __func__, __LINE__, tmpRes);
}
void setSessionWinOutputInfo(SSHashObj* pStUpdated, SResultWindowInfo* pWinInfo) {
@ -3242,7 +3255,7 @@ _end:
return code;
}
static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
static int32_t doStreamSessionAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SExprSupp* pSup = &pOperator->exprSupp;
@ -3252,11 +3265,13 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
qDebug("stask:%s %s status: %d", GET_TASKID(pTaskInfo), getStreamOpName(pOperator->operatorType), pOperator->status);
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
(*ppRes) = NULL;
return code;
} else if (pOperator->status == OP_RES_TO_RETURN) {
SSDataBlock* opRes = buildSessionResult(pOperator);
if (opRes) {
return opRes;
(*ppRes) = opRes;
return code;
}
if (pInfo->recvGetAll) {
@ -3267,11 +3282,13 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
if (pInfo->reCkBlock) {
pInfo->reCkBlock = false;
printDataBlock(pInfo->pCheckpointRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
return pInfo->pCheckpointRes;
(*ppRes) = pInfo->pCheckpointRes;
return code;
}
setStreamOperatorCompleted(pOperator);
return NULL;
(*ppRes) = NULL;
return code;
}
SOperatorInfo* downstream = pOperator->pDownstream[0];
@ -3324,7 +3341,8 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
QUERY_CHECK_CODE(code, lino, _end);
continue;
} else if (pBlock->info.type == STREAM_CREATE_CHILD_TABLE) {
return pBlock;
(*ppRes) = pBlock;
return code;
} else if (pBlock->info.type == STREAM_CHECKPOINT) {
pAggSup->stateStore.streamStateCommit(pAggSup->pState);
doStreamSessionSaveCheckpoint(pOperator);
@ -3403,15 +3421,24 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
SSDataBlock* opRes = buildSessionResult(pOperator);
if (opRes) {
return opRes;
(*ppRes) = opRes;
return code;
}
_end:
if (code != TSDB_CODE_SUCCESS) {
pTaskInfo->code = code;
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
setStreamOperatorCompleted(pOperator);
return NULL;
(*ppRes) = NULL;
return code;
}
static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
SSDataBlock* pRes = NULL;
int32_t code = doStreamSessionAggNext(pOperator, &pRes);
return pRes;
}
void streamSessionReleaseState(SOperatorInfo* pOperator) {
@ -3748,7 +3775,7 @@ _end:
return code;
}
static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) {
static int32_t doStreamSessionSemiAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
@ -3760,13 +3787,15 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) {
qDebug("stask:%s %s status: %d", GET_TASKID(pTaskInfo), getStreamOpName(pOperator->operatorType), pOperator->status);
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
(*ppRes) = NULL;
return code;
}
{
SSDataBlock* opRes = buildSessionResult(pOperator);
if (opRes) {
return opRes;
(*ppRes) = opRes;
return code;
}
if (pInfo->clearState) {
@ -3779,13 +3808,15 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) {
if (pInfo->reCkBlock) {
pInfo->reCkBlock = false;
printDataBlock(pInfo->pCheckpointRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
return pInfo->pCheckpointRes;
(*ppRes) = pInfo->pCheckpointRes;
return code;
}
clearFunctionContext(&pOperator->exprSupp);
// semi session operator clear disk buffer
clearStreamSessionOperator(pInfo);
setStreamOperatorCompleted(pOperator);
return NULL;
(*ppRes) = NULL;
return code;
}
}
@ -3818,7 +3849,8 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) {
QUERY_CHECK_CODE(code, lino, _end);
continue;
} else if (pBlock->info.type == STREAM_CREATE_CHILD_TABLE) {
return pBlock;
(*ppRes) = pBlock;
return code;
} else if (pBlock->info.type == STREAM_CHECKPOINT) {
pAggSup->stateStore.streamStateCommit(pAggSup->pState);
doStreamSessionSaveCheckpoint(pOperator);
@ -3859,11 +3891,13 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) {
SSDataBlock* opRes = buildSessionResult(pOperator);
if (opRes) {
return opRes;
(*ppRes) = opRes;
return code;
}
_end:
if (code != TSDB_CODE_SUCCESS) {
pTaskInfo->code = code;
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
@ -3871,7 +3905,14 @@ _end:
// semi session operator clear disk buffer
clearStreamSessionOperator(pInfo);
setStreamOperatorCompleted(pOperator);
return NULL;
(*ppRes) = NULL;
return code;
}
static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) {
SSDataBlock* pRes = NULL;
int32_t code = doStreamSessionSemiAggNext(pOperator, &pRes);
return pRes;
}
SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,
@ -4248,8 +4289,8 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl
code = appendDataToSpecialBlock(pAggSup->pScanBlock, &curWin.winInfo.sessionWin.win.skey,
&curWin.winInfo.sessionWin.win.ekey, &uid, &groupId, NULL);
QUERY_CHECK_CODE(code, lino, _end);
code = tSimpleHashRemove(pSeUpdated, &curWin.winInfo.sessionWin, sizeof(SSessionKey));
QUERY_CHECK_CODE(code, lino, _end);
int32_t tmpRes = tSimpleHashRemove(pSeUpdated, &curWin.winInfo.sessionWin, sizeof(SSessionKey));
qTrace("%s at line %d res:%d", __func__, __LINE__, tmpRes);
doDeleteSessionWindow(pAggSup, &curWin.winInfo.sessionWin);
releaseOutputBuf(pAggSup->pState, curWin.winInfo.pStatePos, &pAPI->stateStore);
@ -4441,9 +4482,10 @@ static SSDataBlock* buildStateResult(SOperatorInfo* pOperator) {
return NULL;
}
static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
static int32_t doStreamStateAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
(*ppRes) = NULL;
return TSDB_CODE_SUCCESS;
}
int32_t code = TSDB_CODE_SUCCESS;
@ -4456,7 +4498,8 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
if (pOperator->status == OP_RES_TO_RETURN) {
SSDataBlock* resBlock = buildStateResult(pOperator);
if (resBlock != NULL) {
return resBlock;
(*ppRes) = resBlock;
return code;
}
if (pInfo->recvGetAll) {
@ -4467,11 +4510,13 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
if (pInfo->reCkBlock) {
pInfo->reCkBlock = false;
printDataBlock(pInfo->pCheckpointRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
return pInfo->pCheckpointRes;
(*ppRes) = pInfo->pCheckpointRes;
return code;
}
setStreamOperatorCompleted(pOperator);
return NULL;
(*ppRes) = NULL;
return code;
}
SOperatorInfo* downstream = pOperator->pDownstream[0];
@ -4503,7 +4548,8 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
QUERY_CHECK_CODE(code, lino, _end);
continue;
} else if (pBlock->info.type == STREAM_CREATE_CHILD_TABLE) {
return pBlock;
(*ppRes) = pBlock;
return code;
} else if (pBlock->info.type == STREAM_CHECKPOINT) {
pInfo->streamAggSup.stateStore.streamStateCommit(pInfo->streamAggSup.pState);
doStreamStateSaveCheckpoint(pOperator);
@ -4553,15 +4599,24 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
SSDataBlock* resBlock = buildStateResult(pOperator);
if (resBlock != NULL) {
return resBlock;
(*ppRes) = resBlock;
return code;
}
_end:
if (code != TSDB_CODE_SUCCESS) {
pTaskInfo->code = code;
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
setStreamOperatorCompleted(pOperator);
return NULL;
(*ppRes) = NULL;
return code;
}
static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
SSDataBlock* pRes = NULL;
int32_t code = doStreamStateAggNext(pOperator, &pRes);
return pRes;
}
void streamStateReleaseState(SOperatorInfo* pOperator) {
@ -4798,7 +4853,7 @@ static void setInverFunction(SqlFunctionCtx* pCtx, int32_t num, EStreamType type
}
#endif
static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
static int32_t doStreamIntervalAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SStreamIntervalOperatorInfo* pInfo = pOperator->info;
@ -4809,13 +4864,15 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
qDebug("stask:%s %s status: %d", GET_TASKID(pTaskInfo), getStreamOpName(pOperator->operatorType), pOperator->status);
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
(*ppRes) = NULL;
return code;
}
if (pOperator->status == OP_RES_TO_RETURN) {
SSDataBlock* resBlock = buildIntervalResult(pOperator);
if (resBlock != NULL) {
return resBlock;
(*ppRes) = resBlock;
return code;
}
if (pInfo->recvGetAll) {
@ -4826,11 +4883,13 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
if (pInfo->reCkBlock) {
pInfo->reCkBlock = false;
printDataBlock(pInfo->pCheckpointRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
return pInfo->pCheckpointRes;
(*ppRes) = pInfo->pCheckpointRes;
return code;
}
setStreamOperatorCompleted(pOperator);
return NULL;
(*ppRes) = NULL;
return code;
}
SOperatorInfo* downstream = pOperator->pDownstream[0];
@ -4869,7 +4928,8 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
continue;
} else if (pBlock->info.type == STREAM_CREATE_CHILD_TABLE) {
printDataBlock(pBlock, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
return pBlock;
(*ppRes) = pBlock;
return code;
} else if (pBlock->info.type == STREAM_CHECKPOINT) {
pAPI->stateStore.streamStateCommit(pInfo->pState);
doStreamIntervalSaveCheckpoint(pOperator);
@ -4940,14 +5000,23 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
tSimpleHashCleanup(pInfo->pUpdatedMap);
pInfo->pUpdatedMap = NULL;
return buildIntervalResult(pOperator);
(*ppRes) = buildIntervalResult(pOperator);
return code;
_end:
if (code != TSDB_CODE_SUCCESS) {
pTaskInfo->code = code;
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
setStreamOperatorCompleted(pOperator);
return NULL;
(*ppRes) = NULL;
return code;
}
static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
SSDataBlock* pRes = NULL;
int32_t code = doStreamIntervalAggNext(pOperator, &pRes);
return pRes;
}
SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,
@ -5230,7 +5299,7 @@ static SSDataBlock* buildMidIntervalResult(SOperatorInfo* pOperator) {
return NULL;
}
static SSDataBlock* doStreamMidIntervalAgg(SOperatorInfo* pOperator) {
static int32_t doStreamMidIntervalAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SStreamIntervalOperatorInfo* pInfo = pOperator->info;
@ -5242,27 +5311,32 @@ static SSDataBlock* doStreamMidIntervalAgg(SOperatorInfo* pOperator) {
qDebug("stask:%s %s status: %d", GET_TASKID(pTaskInfo), getStreamOpName(pOperator->operatorType), pOperator->status);
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
(*ppRes) = NULL;
return code;
} else if (pOperator->status == OP_RES_TO_RETURN) {
SSDataBlock* resBlock = buildIntervalResult(pOperator);
if (resBlock != NULL) {
return resBlock;
(*ppRes) = resBlock;
return code;
}
setOperatorCompleted(pOperator);
clearFunctionContext(&pOperator->exprSupp);
clearStreamIntervalOperator(pInfo);
qDebug("stask:%s ===stream===%s clear", GET_TASKID(pTaskInfo), getStreamOpName(pOperator->operatorType));
return NULL;
(*ppRes) = NULL;
return code;
} else {
SSDataBlock* resBlock = buildIntervalResult(pOperator);
if (resBlock != NULL) {
return resBlock;
(*ppRes) = resBlock;
return code;
}
resBlock = buildMidIntervalResult(pOperator);
if (resBlock != NULL) {
return resBlock;
(*ppRes) = resBlock;
return code;
}
if (pInfo->clearState) {
@ -5283,7 +5357,8 @@ static SSDataBlock* doStreamMidIntervalAgg(SOperatorInfo* pOperator) {
while (1) {
if (isTaskKilled(pTaskInfo)) {
qInfo("===stream=== %s task is killed, code %s", GET_TASKID(pTaskInfo), tstrerror(pTaskInfo->code));
return NULL;
(*ppRes) = NULL;
return code;
}
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
@ -5329,11 +5404,13 @@ static SSDataBlock* doStreamMidIntervalAgg(SOperatorInfo* pOperator) {
pInfo->pDelRes->info.type = STREAM_DELETE_RESULT;
}
ASSERT(taosArrayGetSize(pInfo->pUpdated) == 0);
return pInfo->pDelRes;
(*ppRes) = pInfo->pDelRes;
return code;
}
continue;
} else if (pBlock->info.type == STREAM_CREATE_CHILD_TABLE) {
return pBlock;
(*ppRes) = pBlock;
return code;
} else if (pBlock->info.type == STREAM_PULL_OVER) {
code = processPullOver(pBlock, pInfo->pPullDataMap, pInfo->pFinalPullDataMap, &pInfo->interval, pInfo->pPullWins,
pInfo->numOfChild, pOperator, &pInfo->recvPullover);
@ -5415,12 +5492,14 @@ static SSDataBlock* doStreamMidIntervalAgg(SOperatorInfo* pOperator) {
SSDataBlock* resBlock = buildIntervalResult(pOperator);
if (resBlock != NULL) {
return resBlock;
(*ppRes) = resBlock;
return code;
}
resBlock = buildMidIntervalResult(pOperator);
if (resBlock != NULL) {
return resBlock;
(*ppRes) = resBlock;
return code;
}
if (pInfo->clearState) {
@ -5431,9 +5510,17 @@ static SSDataBlock* doStreamMidIntervalAgg(SOperatorInfo* pOperator) {
_end:
if (code != TSDB_CODE_SUCCESS) {
pTaskInfo->code = code;
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
return NULL;
(*ppRes) = NULL;
return code;
}
static SSDataBlock* doStreamMidIntervalAgg(SOperatorInfo* pOperator) {
SSDataBlock* pRes = NULL;
int32_t code = doStreamMidIntervalAggNext(pOperator, &pRes);
return pRes;
}
void setStreamOperatorCompleted(SOperatorInfo* pOperator) {

View File

@ -1872,8 +1872,9 @@ static void getDBNameFromCondition(SNode* pCondition, const char* dbName) {
nodesWalkExpr(pCondition, getDBNameFromConditionWalker, (char*)dbName);
}
static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) {
static int32_t doSysTableScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
// build message and send to mnode to fetch the content of system tables.
int32_t code = TSDB_CODE_SUCCESS;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SSysTableScanInfo* pInfo = pOperator->info;
char dbName[TSDB_DB_NAME_LEN] = {0};
@ -1881,7 +1882,8 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) {
while (1) {
if (isTaskKilled(pOperator->pTaskInfo)) {
setOperatorCompleted(pOperator);
return NULL;
(*ppRes) = NULL;
return code;
}
blockDataCleanup(pInfo->pRes);
@ -1923,13 +1925,21 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) {
if (pBlock->info.rows == 0) {
continue;
}
return pBlock;
(*ppRes) = pBlock;
return code;
} else {
return NULL;
(*ppRes) = NULL;
return code;
}
}
}
static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) {
SSDataBlock* pRes = NULL;
int32_t code = doSysTableScanNext(pOperator, &pRes);
return pRes;
}
static void sysTableScanFillTbName(SOperatorInfo* pOperator, const SSysTableScanInfo* pInfo, const char* name,
SSDataBlock* pBlock) {
int32_t code = TSDB_CODE_SUCCESS;
@ -2548,11 +2558,12 @@ static int32_t doGetTableRowSize(SReadHandle* pHandle, uint64_t uid, int32_t* ro
return TSDB_CODE_SUCCESS;
}
static SSDataBlock* doBlockInfoScan(SOperatorInfo* pOperator) {
static int32_t doBlockInfoScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
(*ppRes) = NULL;
return code;
}
SBlockDistInfo* pBlockScanInfo = pOperator->info;
@ -2602,7 +2613,14 @@ _end:
pTaskInfo->code = code;
T_LONG_JMP(pTaskInfo->env, code);
}
return pBlock;
(*ppRes) = pBlock;
return code;
}
static SSDataBlock* doBlockInfoScan(SOperatorInfo* pOperator) {
SSDataBlock* pRes = NULL;
int32_t code = doBlockInfoScanNext(pOperator, &pRes);
return pRes;
}
static void destroyBlockDistScanOperatorInfo(void* param) {

View File

@ -988,12 +988,13 @@ static void doHandleTimeslice(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
copyPrevGroupKey(&pOperator->exprSupp, pSliceInfo->pPrevGroupKey, pBlock);
}
static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
static int32_t doTimesliceNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
(*ppRes) = NULL;
return code;
}
STimeSliceOperatorInfo* pSliceInfo = pOperator->info;
@ -1079,7 +1080,14 @@ _finished:
T_LONG_JMP(pTaskInfo->env, code);
}
return pResBlock->info.rows == 0 ? NULL : pResBlock;
(*ppRes) = pResBlock->info.rows == 0 ? NULL : pResBlock;
return code;
}
static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
SSDataBlock* pRes = NULL;
int32_t code = doTimesliceNext(pOperator, &pRes);
return pRes;
}
static int32_t extractPkColumnFromFuncs(SNodeList* pFuncs, bool* pHasPk, SColumn* pPkColumn) {

View File

@ -811,7 +811,8 @@ void doCloseWindow(SResultRowInfo* pResultRowInfo, const SIntervalAggOperatorInf
}
}
SResultRowPosition addToOpenWindowList(SResultRowInfo* pResultRowInfo, const SResultRow* pResult, uint64_t groupId, SExecTaskInfo* pTaskInfo) {
SResultRowPosition addToOpenWindowList(SResultRowInfo* pResultRowInfo, const SResultRow* pResult, uint64_t groupId,
SExecTaskInfo* pTaskInfo) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SOpenWindowInfo openWin = {0};
@ -1053,36 +1054,28 @@ _end:
return code;
}
static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) {
static int32_t doStateWindowAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
(*ppRes) = NULL;
return TSDB_CODE_SUCCESS;
}
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SStateWindowOperatorInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SOptrBasicInfo* pBInfo = &pInfo->binfo;
pTaskInfo->code = pOperator->fpSet._openFn(pOperator);
if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
setOperatorCompleted(pOperator);
return NULL;
}
code = pOperator->fpSet._openFn(pOperator);
QUERY_CHECK_CODE(code, lino, _end);
int32_t code = blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity);
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
pTaskInfo->code = code;
T_LONG_JMP(pTaskInfo->env, code);
}
code = blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity);
QUERY_CHECK_CODE(code, lino, _end);
while (1) {
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
code = doFilter(pBInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
pTaskInfo->code = code;
T_LONG_JMP(pTaskInfo->env, code);
}
QUERY_CHECK_CODE(code, lino, _end);
bool hasRemain = hasRemainResults(&pInfo->groupResInfo);
if (!hasRemain) {
@ -1096,31 +1089,42 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) {
}
pOperator->resultInfo.totalRows += pBInfo->pRes->info.rows;
return (pBInfo->pRes->info.rows == 0) ? NULL : pBInfo->pRes;
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
pTaskInfo->code = code;
T_LONG_JMP(pTaskInfo->env, code);
}
(*ppRes) = (pBInfo->pRes->info.rows == 0) ? NULL : pBInfo->pRes;
return code;
}
static SSDataBlock* doBuildIntervalResult(SOperatorInfo* pOperator) {
static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) {
SSDataBlock* pRes = NULL;
int32_t code = doStateWindowAggNext(pOperator, &pRes);
return pRes;
}
static int32_t doBuildIntervalResultNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SIntervalAggOperatorInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
(*ppRes) = NULL;
return code;
}
SSDataBlock* pBlock = pInfo->binfo.pRes;
pTaskInfo->code = pOperator->fpSet._openFn(pOperator);
if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
return NULL;
}
code = pOperator->fpSet._openFn(pOperator);
QUERY_CHECK_CODE(code, lino, _end);
while (1) {
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
int32_t code = doFilter(pBlock, pOperator->exprSupp.pFilterInfo, NULL);
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
pTaskInfo->code = code;
T_LONG_JMP(pTaskInfo->env, code);
}
code = doFilter(pBlock, pOperator->exprSupp.pFilterInfo, NULL);
QUERY_CHECK_CODE(code, lino, _end);
bool hasRemain = hasRemainResults(&pInfo->groupResInfo);
if (!hasRemain) {
@ -1136,7 +1140,20 @@ static SSDataBlock* doBuildIntervalResult(SOperatorInfo* pOperator) {
size_t rows = pBlock->info.rows;
pOperator->resultInfo.totalRows += rows;
return (rows == 0) ? NULL : pBlock;
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
pTaskInfo->code = code;
T_LONG_JMP(pTaskInfo->env, code);
}
(*ppRes) = (rows == 0) ? NULL : pBlock;
return code;
}
static SSDataBlock* doBuildIntervalResult(SOperatorInfo* pOperator) {
SSDataBlock* pRes = NULL;
int32_t code = doBuildIntervalResultNext(pOperator, &pRes);
return pRes;
}
static void destroyStateWindowOperatorInfo(void* param) {
@ -1429,9 +1446,10 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator
pRowSup->numOfRows, pBlock->info.rows, numOfOutput);
}
static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) {
static int32_t doSessionWindowAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
(*ppRes) = NULL;
return TSDB_CODE_SUCCESS;
}
int32_t code = TSDB_CODE_SUCCESS;
@ -1458,7 +1476,8 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) {
}
}
pOperator->resultInfo.totalRows += pBInfo->pRes->info.rows;
return (pBInfo->pRes->info.rows == 0) ? NULL : pBInfo->pRes;
(*ppRes) = (pBInfo->pRes->info.rows == 0) ? NULL : pBInfo->pRes;
return code;
}
int64_t st = taosGetTimestampUs();
@ -1519,7 +1538,14 @@ _end:
pTaskInfo->code = code;
T_LONG_JMP(pTaskInfo->env, code);
}
return (pBInfo->pRes->info.rows == 0) ? NULL : pBInfo->pRes;
(*ppRes) = (pBInfo->pRes->info.rows == 0) ? NULL : pBInfo->pRes;
return code;
}
static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) {
SSDataBlock* pRes = NULL;
int32_t code = doSessionWindowAggNext(pOperator, &pRes);
return pRes;
}
// todo make this as an non-blocking operator
@ -1883,13 +1909,14 @@ _end:
}
}
static SSDataBlock* mergeAlignedIntervalAgg(SOperatorInfo* pOperator) {
static int32_t mergeAlignedIntervalAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
int32_t code = TSDB_CODE_SUCCESS;
SMergeAlignedIntervalAggOperatorInfo* pMiaInfo = pOperator->info;
SIntervalAggOperatorInfo* iaInfo = pMiaInfo->intervalAggOperatorInfo;
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
(*ppRes) = NULL;
return code;
}
SSDataBlock* pRes = iaInfo->binfo.pRes;
@ -1913,7 +1940,14 @@ static SSDataBlock* mergeAlignedIntervalAgg(SOperatorInfo* pOperator) {
size_t rows = pRes->info.rows;
pOperator->resultInfo.totalRows += rows;
return (rows == 0) ? NULL : pRes;
(*ppRes) = (rows == 0) ? NULL : pRes;
return code;
}
static SSDataBlock* mergeAlignedIntervalAgg(SOperatorInfo* pOperator) {
SSDataBlock* pRes = NULL;
int32_t code = mergeAlignedIntervalAggNext(pOperator, &pRes);
return pRes;
}
SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, SMergeAlignedIntervalPhysiNode* pNode,
@ -2151,7 +2185,7 @@ static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo*
}
}
static SSDataBlock* doMergeIntervalAgg(SOperatorInfo* pOperator) {
static int32_t doMergeIntervalAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
@ -2161,7 +2195,8 @@ static SSDataBlock* doMergeIntervalAgg(SOperatorInfo* pOperator) {
SExprSupp* pExpSupp = &pOperator->exprSupp;
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
(*ppRes) = NULL;
return code;
}
SSDataBlock* pRes = iaInfo->binfo.pRes;
@ -2231,7 +2266,14 @@ _end:
pTaskInfo->code = code;
T_LONG_JMP(pTaskInfo->env, code);
}
return (rows == 0) ? NULL : pRes;
(*ppRes) = (rows == 0) ? NULL : pRes;
return code;
}
static SSDataBlock* doMergeIntervalAgg(SOperatorInfo* pOperator) {
SSDataBlock* pRes = NULL;
int32_t code = doMergeIntervalAggNext(pOperator, &pRes);
return pRes;
}
SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMergeIntervalPhysiNode* pIntervalPhyNode,

View File

@ -27,6 +27,8 @@ static void *taosProcessSchedQueue(void *param);
static void taosDumpSchedulerStatus(void *qhandle, void *tmrId);
void *taosInitScheduler(int32_t queueSize, int32_t numOfThreads, const char *label, SSchedQueue *pSched) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
bool schedMalloced = false;
if (NULL == pSched) {
@ -95,23 +97,32 @@ void *taosInitScheduler(int32_t queueSize, int32_t numOfThreads, const char *lab
atomic_store_8(&pSched->stop, 0);
for (int32_t i = 0; i < numOfThreads; ++i) {
TdThreadAttr attr;
taosThreadAttrInit(&attr);
taosThreadAttrSetDetachState(&attr, PTHREAD_CREATE_JOINABLE);
int32_t code = taosThreadCreate(pSched->qthread + i, &attr, taosProcessSchedQueue, (void *)pSched);
taosThreadAttrDestroy(&attr);
if (code != 0) {
uError("%s: failed to create rpc thread(%s)", label, strerror(errno));
taosCleanUpScheduler(pSched);
if (schedMalloced) {
taosMemoryFree(pSched);
}
return NULL;
}
code = taosThreadAttrInit(&attr);
QUERY_CHECK_CODE(code, lino, _end);
code = taosThreadAttrSetDetachState(&attr, PTHREAD_CREATE_JOINABLE);
QUERY_CHECK_CODE(code, lino, _end);
code = taosThreadCreate(pSched->qthread + i, &attr, taosProcessSchedQueue, (void *)pSched);
QUERY_CHECK_CODE(code, lino, _end);
(void)taosThreadAttrDestroy(&attr);
++pSched->numOfThreads;
}
uDebug("%s scheduler is initialized, numOfThreads:%d", label, pSched->numOfThreads);
_end:
if (code != TSDB_CODE_SUCCESS) {
taosCleanUpScheduler(pSched);
if (schedMalloced) {
taosMemoryFree(pSched);
}
terrno = code;
uError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
return NULL;
}
return (void *)pSched;
}
@ -220,22 +231,22 @@ void taosCleanUpScheduler(void *param) {
for (int32_t i = 0; i < pSched->numOfThreads; ++i) {
if (taosCheckPthreadValid(pSched->qthread[i])) {
tsem_post(&pSched->fullSem);
(void)tsem_post(&pSched->fullSem);
}
}
for (int32_t i = 0; i < pSched->numOfThreads; ++i) {
if (taosCheckPthreadValid(pSched->qthread[i])) {
taosThreadJoin(pSched->qthread[i], NULL);
(void)taosThreadJoin(pSched->qthread[i], NULL);
taosThreadClear(&pSched->qthread[i]);
}
}
tsem_destroy(&pSched->emptySem);
tsem_destroy(&pSched->fullSem);
taosThreadMutexDestroy(&pSched->queueMutex);
(void)tsem_destroy(&pSched->emptySem);
(void)tsem_destroy(&pSched->fullSem);
(void)taosThreadMutexDestroy(&pSched->queueMutex);
if (pSched->pTimer) {
taosTmrStop(pSched->pTimer);
(void)taosTmrStop(pSched->pTimer);
pSched->pTimer = NULL;
}