adj operator result

This commit is contained in:
54liuyao 2024-07-19 16:15:37 +08:00
parent 6a71994570
commit 13b75ba72a
16 changed files with 1850 additions and 1178 deletions

View File

@ -247,7 +247,7 @@ int32_t doEnsureCapacity(SColumnInfoData* pColumn, const SDataBlockInfo* pBlockI
size_t blockDataGetCapacityInRow(const SSDataBlock* pBlock, size_t pageSize, int32_t extraSize);
int32_t blockDataTrimFirstRows(SSDataBlock* pBlock, size_t n);
int32_t blockDataKeepFirstNRows(SSDataBlock* pBlock, size_t n);
void blockDataKeepFirstNRows(SSDataBlock* pBlock, size_t n);
int32_t assignOneDataBlock(SSDataBlock* dst, const SSDataBlock* src);
int32_t copyDataBlock(SSDataBlock* pDst, const SSDataBlock* pSrc);

View File

@ -135,6 +135,8 @@ static FORCE_INLINE int32_t taosGetTbHashVal(const char *tbname, int32_t tblen,
} \
} while (0)
#define QUERY_CHECK_CODE TSDB_CHECK_CODE
#define TSDB_CHECK_NULL(ptr, CODE, LINO, LABEL, ERRNO) \
if ((ptr) == NULL) { \
(CODE) = (ERRNO); \
@ -142,6 +144,8 @@ static FORCE_INLINE int32_t taosGetTbHashVal(const char *tbname, int32_t tblen,
goto LABEL; \
}
#define QUERY_CHECK_NULL TSDB_CHECK_NULL
#define ARRAY_SIZE(a) (sizeof(a) / sizeof(a[0]))
#define VND_CHECK_CODE(CODE, LINO, LABEL) TSDB_CHECK_CODE(CODE, LINO, LABEL)

View File

@ -1990,14 +1990,14 @@ static void colDataKeepFirstNRows(SColumnInfoData* pColInfoData, size_t n, size_
}
}
int32_t blockDataKeepFirstNRows(SSDataBlock* pBlock, size_t n) {
void blockDataKeepFirstNRows(SSDataBlock* pBlock, size_t n) {
if (n == 0) {
blockDataEmpty(pBlock);
return TSDB_CODE_SUCCESS;
return ;
}
if (pBlock->info.rows <= n) {
return TSDB_CODE_SUCCESS;
return ;
} else {
size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
for (int32_t i = 0; i < numOfCols; ++i) {
@ -2007,7 +2007,7 @@ int32_t blockDataKeepFirstNRows(SSDataBlock* pBlock, size_t n) {
pBlock->info.rows = n;
}
return TSDB_CODE_SUCCESS;
return ;
}
int32_t tEncodeDataBlock(void** buf, const SSDataBlock* pBlock) {

View File

@ -862,10 +862,11 @@ int32_t doFilter(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SColMatchInfo* p
int32_t addTagPseudoColumnData(SReadHandle* pHandle, const SExprInfo* pExpr, int32_t numOfExpr, SSDataBlock* pBlock,
int32_t rows, SExecTaskInfo* pTask, STableMetaCacheInfo* pCache);
void appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle);
void setTbNameColData(const SSDataBlock* pBlock, SColumnInfoData* pColInfoData, int32_t functionId, const char* name);
void setVgIdColData(const SSDataBlock* pBlock, SColumnInfoData* pColInfoData, int32_t functionId, int32_t vgId);
void setVgVerColData(const SSDataBlock* pBlock, SColumnInfoData* pColInfoData, int32_t functionId, int64_t vgVer);
void appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle);
int32_t setTbNameColData(const SSDataBlock* pBlock, SColumnInfoData* pColInfoData, int32_t functionId,
const char* name);
int32_t setVgIdColData(const SSDataBlock* pBlock, SColumnInfoData* pColInfoData, int32_t functionId, int32_t vgId);
int32_t setVgVerColData(const SSDataBlock* pBlock, SColumnInfoData* pColInfoData, int32_t functionId, int64_t vgVer);
int32_t setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput,
int32_t* rowEntryInfoOffset);
@ -897,8 +898,8 @@ bool isOverdue(TSKEY ts, STimeWindowAggSupp* pSup);
bool isCloseWindow(STimeWindow* pWin, STimeWindowAggSupp* pSup);
bool isDeletedStreamWindow(STimeWindow* pWin, uint64_t groupId, void* pState, STimeWindowAggSupp* pTwSup,
SStateStore* pStore);
void appendDataToSpecialBlock(SSDataBlock* pBlock, TSKEY* pStartTs, TSKEY* pEndTs, uint64_t* pUid, uint64_t* pGp,
void* pTbName);
int32_t appendDataToSpecialBlock(SSDataBlock* pBlock, TSKEY* pStartTs, TSKEY* pEndTs, uint64_t* pUid, uint64_t* pGp,
void* pTbName);
uint64_t calGroupIdByData(SPartitionBySupporter* pParSup, SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t rowId);

View File

@ -1166,7 +1166,7 @@ static SSDataBlock* buildStreamPartitionResult(SOperatorInfo* pOperator) {
int32_t winCode = TSDB_CODE_SUCCESS;
code = pAPI->stateStore.streamStateGetParName(pOperator->pTaskInfo->streamInfo.pState, pParInfo->groupId, &tbname,
false, &winCode);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
if (winCode == TSDB_CODE_SUCCESS) {
memcpy(pDest->info.parTbName, tbname, TSDB_TABLE_NAME_LEN);
@ -1199,7 +1199,7 @@ int32_t appendCreateTableRow(void* pState, SExprSupp* pTableSup, SExprSupp* pTag
void* pValue = NULL;
int32_t winCode = TSDB_CODE_SUCCESS;
code = pAPI->streamStateGetParName(pState, groupId, &pValue, true, &winCode);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
if (winCode != TSDB_CODE_SUCCESS) {
SSDataBlock* pTmpBlock = blockCopyOneRow(pSrcBlock, rowId);
@ -1209,7 +1209,7 @@ int32_t appendCreateTableRow(void* pState, SExprSupp* pTableSup, SExprSupp* pTag
if (pTableSup->numOfExprs > 0) {
code = projectApplyFunctions(pTableSup->pExprInfo, pDestBlock, pTmpBlock, pTableSup->pCtx, pTableSup->numOfExprs,
NULL);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
SColumnInfoData* pTbCol = taosArrayGet(pDestBlock->pDataBlock, UD_TABLE_NAME_COLUMN_INDEX);
memset(tbName, 0, TSDB_TABLE_NAME_LEN);
@ -1222,7 +1222,7 @@ int32_t appendCreateTableRow(void* pState, SExprSupp* pTableSup, SExprSupp* pTag
len = TMIN(varDataLen(pData), TSDB_TABLE_NAME_LEN - 1);
memcpy(tbName, varDataVal(pData), len);
code = pAPI->streamStatePutParName(pState, groupId, tbName);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
}
memcpy(pTmpBlock->info.parTbName, tbName, len);
pDestBlock->info.rows--;
@ -1234,7 +1234,7 @@ int32_t appendCreateTableRow(void* pState, SExprSupp* pTableSup, SExprSupp* pTag
if (pTagSup->numOfExprs > 0) {
code = projectApplyFunctions(pTagSup->pExprInfo, pDestBlock, pTmpBlock, pTagSup->pCtx, pTagSup->numOfExprs, NULL);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
pDestBlock->info.rows--;
} else {
memcpy(pDestBlock->info.parTbName, pTmpBlock->info.parTbName, TSDB_TABLE_NAME_LEN);
@ -1242,7 +1242,7 @@ int32_t appendCreateTableRow(void* pState, SExprSupp* pTableSup, SExprSupp* pTag
void* pGpIdCol = taosArrayGet(pDestBlock->pDataBlock, UD_GROUPID_COLUMN_INDEX);
code = colDataSetVal(pGpIdCol, pDestBlock->info.rows, (const char*)&groupId, false);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
pDestBlock->info.rows++;
blockDataDestroy(pTmpBlock);
} else {
@ -1269,14 +1269,14 @@ static int32_t buildStreamCreateTableResult(SOperatorInfo* pOperator) {
}
blockDataCleanup(pInfo->pCreateTbRes);
code = blockDataEnsureCapacity(pInfo->pCreateTbRes, taosHashGetSize(pInfo->pPartitions));
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
if (pInfo->pTbNameIte != NULL) {
SPartitionDataInfo* pParInfo = (SPartitionDataInfo*)pInfo->pTbNameIte;
int32_t rowId = *(int32_t*)taosArrayGet(pParInfo->rowIds, 0);
code = appendCreateTableRow(pTask->streamInfo.pState, &pInfo->tbnameCalSup, &pInfo->tagCalSup, pParInfo->groupId,
pSrc, rowId, pInfo->pCreateTbRes, &pTask->storageAPI.stateStore);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
pInfo->pTbNameIte = taosHashIterate(pInfo->pPartitions, pInfo->pTbNameIte);
}
@ -1318,7 +1318,7 @@ static SSDataBlock* doStreamHashPartition(SOperatorInfo* pOperator) {
if (hasRemainTbName(pInfo)) {
code = buildStreamCreateTableResult(pOperator);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
if (pInfo->pCreateTbRes && pInfo->pCreateTbRes->info.rows > 0) {
return pInfo->pCreateTbRes;
}
@ -1373,7 +1373,7 @@ static SSDataBlock* doStreamHashPartition(SOperatorInfo* pOperator) {
pInfo->parIte = taosHashIterate(pInfo->pPartitions, NULL);
pInfo->pTbNameIte = taosHashIterate(pInfo->pPartitions, NULL);
code = buildStreamCreateTableResult(pOperator);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
if (pInfo->pCreateTbRes && pInfo->pCreateTbRes->info.rows > 0) {
return pInfo->pCreateTbRes;
}
@ -1569,10 +1569,10 @@ SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStr
setOperatorStreamStateFn(pOperator, streamOpReleaseState, streamOpReloadState);
code = initParDownStream(downstream, &pInfo->partitionSup, &pInfo->scalarSup, &pInfo->tbnameCalSup);
TSDB_CHECK_CODE(code, lino, _error);
QUERY_CHECK_CODE(code, lino, _error);
code = appendDownstream(pOperator, &downstream, 1);
TSDB_CHECK_CODE(code, lino, _error);
QUERY_CHECK_CODE(code, lino, _error);
return pOperator;

File diff suppressed because it is too large Load Diff

View File

@ -84,7 +84,7 @@ int32_t setCountOutputBuf(SStreamAggSupporter* pAggSup, TSKEY ts, uint64_t group
if (pBuffInfo->winBuffOp == CREATE_NEW_WINDOW) {
code = pAggSup->stateStore.streamStateCountWinAdd(pAggSup->pState, &pCurWin->winInfo.sessionWin,
(void**)&pCurWin->winInfo.pStatePos, &size);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
winCode = TSDB_CODE_FAILED;
} else if (pBuffInfo->winBuffOp == MOVE_NEXT_WINDOW) {
@ -95,7 +95,7 @@ int32_t setCountOutputBuf(SStreamAggSupporter* pAggSup, TSKEY ts, uint64_t group
if (winCode == TSDB_CODE_FAILED) {
code = pAggSup->stateStore.streamStateCountWinAdd(pAggSup->pState, &pCurWin->winInfo.sessionWin,
(void**)&pCurWin->winInfo.pStatePos, &size);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
}
} else {
pBuffInfo->pCur = pAggSup->stateStore.streamStateCountSeekKeyPrev(pAggSup->pState, &pCurWin->winInfo.sessionWin,
@ -105,7 +105,7 @@ int32_t setCountOutputBuf(SStreamAggSupporter* pAggSup, TSKEY ts, uint64_t group
if (winCode == TSDB_CODE_FAILED) {
code = pAggSup->stateStore.streamStateCountWinAdd(pAggSup->pState, &pCurWin->winInfo.sessionWin,
(void**)&pCurWin->winInfo.pStatePos, &size);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
}
}
if (ts < pCurWin->winInfo.sessionWin.win.ekey) {
@ -115,7 +115,7 @@ int32_t setCountOutputBuf(SStreamAggSupporter* pAggSup, TSKEY ts, uint64_t group
code = pAggSup->stateStore.streamStateCountWinAddIfNotExist(pAggSup->pState, &pCurWin->winInfo.sessionWin,
pAggSup->windowCount,
(void**)&pCurWin->winInfo.pStatePos, &size, &winCode);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
}
if (winCode == TSDB_CODE_SUCCESS) {
@ -173,7 +173,7 @@ static int32_t updateCountWindowInfo(SStreamAggSupporter* pAggSup, SCountWindowI
needDelState = true;
if (pStDeleted && pWinInfo->winInfo.isOutput) {
code = saveDeleteRes(pStDeleted, pWinInfo->winInfo.sessionWin);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
}
pWinInfo->winInfo.sessionWin.win.skey = pTs[start];
@ -268,11 +268,11 @@ static void doStreamCountAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl
SColumnInfoData* pStartTsCol = taosArrayGet(pSDataBlock->pDataBlock, pInfo->primaryTsIndex);
if (!pStartTsCol) {
code = TSDB_CODE_FAILED;
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
}
TSKEY* startTsCols = (int64_t*)pStartTsCol->pData;
code = blockDataEnsureCapacity(pAggSup->pScanBlock, rows * 2);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
SStreamStateCur* pCur = NULL;
COUNT_TYPE slidingRows = 0;
@ -287,7 +287,7 @@ static void doStreamCountAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl
SCountWindowInfo curWin = {0};
buffInfo.rebuildWindow = false;
code = setCountOutputBuf(pAggSup, startTsCols[i], groupId, &curWin, &buffInfo);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
if (!inCountSlidingWindow(pAggSup, &curWin.winInfo.sessionWin.win, &pSDataBlock->info)) {
buffInfo.winBuffOp = MOVE_NEXT_WINDOW;
@ -296,9 +296,9 @@ static void doStreamCountAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl
setSessionWinOutputInfo(pStUpdated, &curWin.winInfo);
slidingRows = *curWin.pWindowCount;
if (!buffInfo.rebuildWindow) {
code = updateCountWindowInfo(pAggSup, &curWin, startTsCols, i, rows, pAggSup->windowCount, pStUpdated,
pStDeleted, &buffInfo.rebuildWindow, &winRows);
TSDB_CHECK_CODE(code, lino, _end);
code = updateCountWindowInfo(pAggSup, &curWin, startTsCols, i, rows, pAggSup->windowCount, pStUpdated, pStDeleted,
&buffInfo.rebuildWindow, &winRows);
QUERY_CHECK_CODE(code, lino, _end);
}
if (buffInfo.rebuildWindow) {
SSessionKey range = {0};
@ -310,24 +310,26 @@ static void doStreamCountAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl
range.win.skey = TMIN(startTsCols[i], range.win.skey);
range.win.ekey = TMAX(startTsCols[rows - 1], range.win.ekey);
uint64_t uid = 0;
appendDataToSpecialBlock(pAggSup->pScanBlock, &range.win.skey, &range.win.ekey, &uid, &range.groupId, NULL);
code =
appendDataToSpecialBlock(pAggSup->pScanBlock, &range.win.skey, &range.win.ekey, &uid, &range.groupId, NULL);
QUERY_CHECK_CODE(code, lino, _end);
break;
}
code = doOneWindowAggImpl(&pInfo->twAggSup.timeWindowData, &curWin.winInfo, &pResult, i, winRows, rows, numOfOutput,
pOperator, 0);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
code = saveSessionOutputBuf(pAggSup, &curWin.winInfo);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
if (pInfo->destHasPrimaryKey && curWin.winInfo.isOutput && IS_NORMAL_COUNT_OP(pOperator)) {
code = saveDeleteRes(pInfo->pPkDeleted, curWin.winInfo.sessionWin);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
}
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE && pStUpdated) {
code = saveResult(curWin.winInfo, pStUpdated);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
}
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) {
curWin.winInfo.pStatePos->beUpdated = true;
@ -335,7 +337,7 @@ static void doStreamCountAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl
getSessionHashKey(&curWin.winInfo.sessionWin, &key);
code =
tSimpleHashPut(pAggSup->pResultRows, &key, sizeof(SSessionKey), &curWin.winInfo, sizeof(SResultWindowInfo));
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
}
if (isSlidingCountWindow(pAggSup)) {
@ -425,7 +427,7 @@ int32_t doStreamCountDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOpera
SStreamCountAggOperatorInfo* pInfo = pOperator->info;
if (!pInfo) {
code = TSDB_CODE_FAILED;
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
}
// 4.checksum
@ -434,7 +436,7 @@ int32_t doStreamCountDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOpera
void* pCksum = POINTER_SHIFT(buf, dataLen);
if (taosCheckChecksum(buf, dataLen, *(uint32_t*)pCksum) != TSDB_CODE_SUCCESS) {
code = TSDB_CODE_FAILED;
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
}
}
@ -447,12 +449,12 @@ int32_t doStreamCountDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOpera
buf = decodeSSessionKey(buf, &key);
SBuffInfo buffInfo = {.rebuildWindow = false, .winBuffOp = NONE_WINDOW, .pCur = NULL};
code = setCountOutputBuf(&pInfo->streamAggSup, key.win.skey, key.groupId, &curWin, &buffInfo);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
buf = decodeSResultWindowInfo(buf, &curWin.winInfo, pInfo->streamAggSup.resultRowSize);
code = tSimpleHashPut(pInfo->streamAggSup.pResultRows, &key, sizeof(SSessionKey), &curWin.winInfo,
sizeof(SResultWindowInfo));
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
}
// 2.twAggSup
@ -478,7 +480,7 @@ void doStreamCountSaveCheckpoint(SOperatorInfo* pOperator) {
pBuf = taosMemoryCalloc(1, len);
if (!pBuf) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
}
len = doStreamCountEncodeOpState(&pBuf, len, pOperator, true);
pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_COUNT_OP_CHECKPOINT_NAME,
@ -556,7 +558,7 @@ int32_t doDeleteCountWindows(SStreamAggSupporter* pAggSup, SSDataBlock* pBlock,
doDeleteSessionWindow(pAggSup, &curWin);
if (result) {
code = saveDeleteInfo(result, curWin);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
}
}
}
@ -575,22 +577,22 @@ int32_t deleteCountWinState(SStreamAggSupporter* pAggSup, SSDataBlock* pBlock, S
SArray* pWins = taosArrayInit(16, sizeof(SSessionKey));
if (!pWins) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
}
if (isSlidingCountWindow(pAggSup)) {
code = doDeleteCountWindows(pAggSup, pBlock, pWins);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
} else {
code = doDeleteTimeWindows(pAggSup, pBlock, pWins);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
}
removeSessionResults(pAggSup, pMapUpdate, pWins);
code = copyDeleteWindowInfo(pWins, pMapDelete);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
if (needAdd) {
code = copyDeleteWindowInfo(pWins, pPkDelete);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
}
taosArrayDestroy(pWins);
@ -653,7 +655,7 @@ static SSDataBlock* doStreamCountAgg(SOperatorInfo* pOperator) {
bool add = pInfo->destHasPrimaryKey && IS_NORMAL_COUNT_OP(pOperator);
code = deleteCountWinState(&pInfo->streamAggSup, pBlock, pInfo->pStUpdated, pInfo->pStDeleted, pInfo->pPkDeleted,
add);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
continue;
} else if (pBlock->info.type == STREAM_CLEAR) {
doResetCountWindows(&pInfo->streamAggSup, pBlock);
@ -661,7 +663,7 @@ static SSDataBlock* doStreamCountAgg(SOperatorInfo* pOperator) {
} else if (pBlock->info.type == STREAM_GET_ALL) {
pInfo->recvGetAll = true;
code = getAllSessionWindow(pAggSup->pResultRows, pInfo->pStUpdated);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
continue;
} else if (pBlock->info.type == STREAM_CREATE_CHILD_TABLE) {
return pBlock;
@ -669,7 +671,7 @@ static SSDataBlock* doStreamCountAgg(SOperatorInfo* pOperator) {
pAggSup->stateStore.streamStateCommit(pAggSup->pState);
doStreamCountSaveCheckpoint(pOperator);
code = copyDataBlock(pInfo->pCheckpointRes, pBlock);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
continue;
} else {
ASSERTS(pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_INVALID, "invalid SSDataBlock type");
@ -678,7 +680,7 @@ static SSDataBlock* doStreamCountAgg(SOperatorInfo* pOperator) {
if (pInfo->scalarSupp.pExprInfo != NULL) {
SExprSupp* pExprSup = &pInfo->scalarSupp;
code = projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
}
// the pDataBlock are always the same one, no need to call this again
setInputDataBlock(pSup, pBlock, TSDB_ORDER_ASC, MAIN_SCAN, true);
@ -690,20 +692,20 @@ static SSDataBlock* doStreamCountAgg(SOperatorInfo* pOperator) {
pOperator->status = OP_RES_TO_RETURN;
code = closeSessionWindow(pAggSup->pResultRows, &pInfo->twAggSup, pInfo->pStUpdated);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
code = copyUpdateResult(&pInfo->pStUpdated, pInfo->pUpdated, sessionKeyCompareAsc);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
removeSessionDeleteResults(pInfo->pStDeleted, pInfo->pUpdated);
initGroupResInfoFromArrayList(&pInfo->groupResInfo, pInfo->pUpdated);
pInfo->pUpdated = NULL;
code = blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
if (pInfo->destHasPrimaryKey && IS_NORMAL_COUNT_OP(pOperator)) {
code = copyDeleteSessionKey(pInfo->pPkDeleted, pInfo->pStDeleted);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
}
SSDataBlock* opRes = buildCountResult(pOperator);
@ -727,7 +729,7 @@ void streamCountReleaseState(SOperatorInfo* pOperator) {
char* pBuff = taosMemoryCalloc(1, resSize);
if (pBuff) {
code = terrno;
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
}
memcpy(pBuff, &pInfo->twAggSup.maxTs, sizeof(TSKEY));
qDebug("===stream=== count window operator relase state. ");
@ -755,7 +757,7 @@ void streamCountReloadState(SOperatorInfo* pOperator) {
code = pAggSup->stateStore.streamStateGetInfo(pAggSup->pState, STREAM_COUNT_OP_STATE_NAME,
strlen(STREAM_COUNT_OP_STATE_NAME), &pBuf, &size);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
TSKEY ts = *(TSKEY*)pBuf;
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, ts);
@ -783,7 +785,7 @@ SOperatorInfo* createStreamCountAggOperatorInfo(SOperatorInfo* downstream, SPhys
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _error);
QUERY_CHECK_CODE(code, lino, _error);
}
pOperator->pTaskInfo = pTaskInfo;
@ -793,14 +795,14 @@ SOperatorInfo* createStreamCountAggOperatorInfo(SOperatorInfo* downstream, SPhys
int32_t numOfScalar = 0;
SExprInfo* pScalarExprInfo = createExprInfo(pCountNode->window.pExprs, NULL, &numOfScalar);
code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar, &pTaskInfo->storageAPI.functionStore);
TSDB_CHECK_CODE(code, lino, _error);
QUERY_CHECK_CODE(code, lino, _error);
}
SExprSupp* pExpSup = &pOperator->exprSupp;
SExprInfo* pExprInfo = createExprInfo(pCountNode->window.pFuncs, NULL, &numOfCols);
SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc);
code = initBasicInfoEx(&pInfo->binfo, pExpSup, pExprInfo, numOfCols, pResBlock, &pTaskInfo->storageAPI.functionStore);
TSDB_CHECK_CODE(code, lino, _error);
QUERY_CHECK_CODE(code, lino, _error);
pInfo->twAggSup = (STimeWindowAggSupp){
.waterMark = pCountNode->window.watermark,
@ -814,7 +816,7 @@ SOperatorInfo* createStreamCountAggOperatorInfo(SOperatorInfo* downstream, SPhys
code = initStreamAggSupporter(&pInfo->streamAggSup, pExpSup, numOfCols, 0, pTaskInfo->streamInfo.pState,
sizeof(COUNT_TYPE), 0, &pTaskInfo->storageAPI.stateStore, pHandle, &pInfo->twAggSup,
GET_TASKID(pTaskInfo), &pTaskInfo->storageAPI, pInfo->primaryTsIndex);
TSDB_CHECK_CODE(code, lino, _error);
QUERY_CHECK_CODE(code, lino, _error);
pInfo->streamAggSup.windowCount = pCountNode->windowCount;
pInfo->streamAggSup.windowSliding = pCountNode->windowSliding;
@ -834,7 +836,7 @@ SOperatorInfo* createStreamCountAggOperatorInfo(SOperatorInfo* downstream, SPhys
pInfo->historyWins = taosArrayInit(4, sizeof(SSessionKey));
if (!pInfo->historyWins) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _error);
QUERY_CHECK_CODE(code, lino, _error);
}
pInfo->pCheckpointRes = createSpecialDataBlock(STREAM_CHECKPOINT);
@ -853,7 +855,7 @@ SOperatorInfo* createStreamCountAggOperatorInfo(SOperatorInfo* downstream, SPhys
strlen(STREAM_COUNT_OP_CHECKPOINT_NAME), &buff, &len);
if (res == TSDB_CODE_SUCCESS) {
code = doStreamCountDecodeOpState(buff, len, pOperator, true);
TSDB_CHECK_CODE(code, lino, _error);
QUERY_CHECK_CODE(code, lino, _error);
taosMemoryFree(buff);
}
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamCountAgg, NULL, destroyStreamCountAggOperatorInfo,
@ -863,10 +865,10 @@ SOperatorInfo* createStreamCountAggOperatorInfo(SOperatorInfo* downstream, SPhys
if (downstream) {
code = initDownStream(downstream, &pInfo->streamAggSup, pOperator->operatorType, pInfo->primaryTsIndex,
&pInfo->twAggSup, &pInfo->basic);
TSDB_CHECK_CODE(code, lino, _error);
QUERY_CHECK_CODE(code, lino, _error);
code = appendDownstream(pOperator, &downstream, 1);
TSDB_CHECK_CODE(code, lino, _error);
QUERY_CHECK_CODE(code, lino, _error);
}
return pOperator;

View File

@ -154,7 +154,7 @@ int32_t setEventOutputBuf(SStreamAggSupporter* pAggSup, TSKEY* pTs, uint64_t gro
SSessionKey winKey = {.win.skey = ts, .win.ekey = ts, .groupId = groupId};
code = pAggSup->stateStore.streamStateSessionAllocWinBuffByNextPosition(pAggSup->pState, pCur, &winKey, &pVal, &len);
TSDB_CHECK_CODE(code, lino, _error);
QUERY_CHECK_CODE(code, lino, _error);
setEventWindowInfo(pAggSup, &winKey, pVal, pCurWin);
pCurWin->pWinFlag->startFlag = start;
@ -213,7 +213,7 @@ int32_t updateEventWindowInfo(SStreamAggSupporter* pAggSup, SEventWindowInfo* pW
if (pWin->skey > pTsData[i]) {
if (pStDeleted && pWinInfo->winInfo.isOutput) {
code = saveDeleteRes(pStDeleted, pWinInfo->winInfo.sessionWin);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
}
removeSessionResult(pAggSup, pStUpdated, pResultRows, &pWinInfo->winInfo.sessionWin);
pWin->skey = pTsData[i];
@ -278,7 +278,7 @@ static int32_t compactEventWindow(SOperatorInfo* pOperator, SEventWindowInfo* pC
setEventWindowFlag(pAggSup, &nextWinInfo);
code = compactTimeWindow(pSup, pAggSup, &pInfo->twAggSup, pTaskInfo, &pCurWin->winInfo, &nextWinInfo.winInfo,
pStUpdated, pStDeleted, false);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
pCurWin->pWinFlag->endFlag = nextWinInfo.pWinFlag->endFlag;
}
@ -321,7 +321,7 @@ static void doStreamEventAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl
SColumnInfoData* pColDataInfo = taosArrayGet(pSDataBlock->pDataBlock, pInfo->primaryTsIndex);
if (!pColDataInfo) {
code = TSDB_CODE_FAILED;
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
}
tsCols = (int64_t*)pColDataInfo->pData;
} else {
@ -337,7 +337,7 @@ static void doStreamEventAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl
}
int32_t statusStart = 0;
code = filterExecute(pInfo->pStartCondInfo, pSDataBlock, &pColStart, NULL, paramStart.numOfCols, &statusStart);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
SFilterColumnParam paramEnd = {.numOfCols = taosArrayGetSize(pSDataBlock->pDataBlock),
.pDataBlock = pSDataBlock->pDataBlock};
@ -349,11 +349,11 @@ static void doStreamEventAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl
int32_t statusEnd = 0;
code = filterExecute(pInfo->pEndCondInfo, pSDataBlock, &pColEnd, NULL, paramEnd.numOfCols, &statusEnd);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
int32_t rows = pSDataBlock->info.rows;
code = blockDataEnsureCapacity(pAggSup->pScanBlock, rows);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
for (int32_t i = 0; i < rows; i += winRows) {
if (pInfo->ignoreExpiredData && checkExpiredData(&pInfo->streamAggSup.stateStore, pInfo->streamAggSup.pUpdateInfo,
@ -367,48 +367,50 @@ static void doStreamEventAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl
SSessionKey nextWinKey = {0};
code = setEventOutputBuf(pAggSup, tsCols, groupId, (bool*)pColStart->pData, (bool*)pColEnd->pData, i, rows, &curWin,
&nextWinKey);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
setSessionWinOutputInfo(pSeUpdated, &curWin.winInfo);
bool rebuild = false;
code = updateEventWindowInfo(pAggSup, &curWin, &nextWinKey, tsCols, (bool*)pColStart->pData, (bool*)pColEnd->pData,
rows, i, pAggSup->pResultRows, pSeUpdated, pStDeleted, &rebuild, &winRows);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
ASSERT(winRows >= 1);
if (rebuild) {
uint64_t uid = 0;
appendDataToSpecialBlock(pAggSup->pScanBlock, &curWin.winInfo.sessionWin.win.skey,
&curWin.winInfo.sessionWin.win.ekey, &uid, &groupId, NULL);
code = appendDataToSpecialBlock(pAggSup->pScanBlock, &curWin.winInfo.sessionWin.win.skey,
&curWin.winInfo.sessionWin.win.ekey, &uid, &groupId, NULL);
QUERY_CHECK_CODE(code, lino, _end);
code = tSimpleHashRemove(pSeUpdated, &curWin.winInfo.sessionWin, sizeof(SSessionKey));
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
doDeleteEventWindow(pAggSup, pSeUpdated, &curWin.winInfo.sessionWin);
if (pInfo->destHasPrimaryKey && curWin.winInfo.isOutput && IS_NORMAL_EVENT_OP(pOperator) &&
!isWindowIncomplete(&curWin)) {
code = saveDeleteRes(pInfo->pPkDeleted, curWin.winInfo.sessionWin);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
}
releaseOutputBuf(pAggSup->pState, curWin.winInfo.pStatePos, &pAPI->stateStore);
SSessionKey tmpSeInfo = {0};
getSessionHashKey(&curWin.winInfo.sessionWin, &tmpSeInfo);
code = tSimpleHashPut(pStDeleted, &tmpSeInfo, sizeof(SSessionKey), NULL, 0);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
continue;
}
code = doOneWindowAggImpl(&pInfo->twAggSup.timeWindowData, &curWin.winInfo, &pResult, i, winRows, rows, numOfOutput,
pOperator, 0);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
code = compactEventWindow(pOperator, &curWin, pInfo->pSeUpdated, pInfo->pSeDeleted, false);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
code = saveSessionOutputBuf(pAggSup, &curWin.winInfo);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
if (pInfo->isHistoryOp) {
code = saveResult(curWin.winInfo, pInfo->pAllUpdated);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
}
if (isWindowIncomplete(&curWin)) {
@ -418,12 +420,12 @@ static void doStreamEventAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl
if (pInfo->destHasPrimaryKey && curWin.winInfo.isOutput && IS_NORMAL_EVENT_OP(pOperator)) {
code = saveDeleteRes(pInfo->pPkDeleted, curWin.winInfo.sessionWin);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
}
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) {
code = saveResult(curWin.winInfo, pSeUpdated);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
}
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) {
@ -432,7 +434,7 @@ static void doStreamEventAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl
getSessionHashKey(&curWin.winInfo.sessionWin, &key);
code =
tSimpleHashPut(pAggSup->pResultRows, &key, sizeof(SSessionKey), &curWin.winInfo, sizeof(SResultWindowInfo));
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
}
}
@ -490,7 +492,7 @@ int32_t doStreamEventDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOpera
SStreamEventAggOperatorInfo* pInfo = pOperator->info;
if (!pInfo) {
code = TSDB_CODE_FAILED;
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
}
SStreamAggSupporter* pAggSup = &pInfo->streamAggSup;
@ -500,7 +502,7 @@ int32_t doStreamEventDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOpera
if (taosCheckChecksum(buf, dataLen, *(uint32_t*)pCksum) != TSDB_CODE_SUCCESS) {
qError("stream event state is invalid");
code = TSDB_CODE_FAILED;
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
}
// 1.streamAggSup.pResultRows
@ -513,13 +515,13 @@ int32_t doStreamEventDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOpera
int32_t winCode = TSDB_CODE_SUCCESS;
code = pAggSup->stateStore.streamStateSessionAddIfNotExist(
pAggSup->pState, &winfo.sessionWin, pAggSup->gap, (void**)&winfo.pStatePos, &pAggSup->resultRowSize, &winCode);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
ASSERT(winCode == TSDB_CODE_SUCCESS);
buf = decodeSResultWindowInfo(buf, &winfo, pInfo->streamAggSup.resultRowSize);
code =
tSimpleHashPut(pInfo->streamAggSup.pResultRows, &key, sizeof(SSessionKey), &winfo, sizeof(SResultWindowInfo));
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
}
// 2.twAggSup
@ -622,12 +624,12 @@ static SSDataBlock* doStreamEventAgg(SOperatorInfo* pOperator) {
bool add = pInfo->destHasPrimaryKey && IS_NORMAL_EVENT_OP(pOperator);
code = deleteSessionWinState(&pInfo->streamAggSup, pBlock, pInfo->pSeUpdated, pInfo->pSeDeleted,
pInfo->pPkDeleted, add);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
continue;
} else if (pBlock->info.type == STREAM_GET_ALL) {
pInfo->recvGetAll = true;
code = getAllSessionWindow(pInfo->streamAggSup.pResultRows, pInfo->pSeUpdated);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
continue;
} else if (pBlock->info.type == STREAM_CREATE_CHILD_TABLE) {
return pBlock;
@ -636,7 +638,7 @@ static SSDataBlock* doStreamEventAgg(SOperatorInfo* pOperator) {
doStreamEventSaveCheckpoint(pOperator);
pInfo->reCkBlock = true;
code = copyDataBlock(pInfo->pCheckpointRes, pBlock);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
continue;
} else {
ASSERTS(pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_INVALID, "invalid SSDataBlock type");
@ -645,7 +647,7 @@ static SSDataBlock* doStreamEventAgg(SOperatorInfo* pOperator) {
if (pInfo->scalarSupp.pExprInfo != NULL) {
SExprSupp* pExprSup = &pInfo->scalarSupp;
code = projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
}
// the pDataBlock are always the same one, no need to call this again
setInputDataBlock(pSup, pBlock, TSDB_ORDER_ASC, MAIN_SCAN, true);
@ -656,10 +658,10 @@ static SSDataBlock* doStreamEventAgg(SOperatorInfo* pOperator) {
pOperator->status = OP_RES_TO_RETURN;
code = closeSessionWindow(pInfo->streamAggSup.pResultRows, &pInfo->twAggSup, pInfo->pSeUpdated);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
code = copyUpdateResult(&pInfo->pSeUpdated, pInfo->pUpdated, sessionKeyCompareAsc);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
removeSessionDeleteResults(pInfo->pSeDeleted, pInfo->pUpdated);
@ -667,26 +669,26 @@ static SSDataBlock* doStreamEventAgg(SOperatorInfo* pOperator) {
SArray* pHisWins = taosArrayInit(16, sizeof(SEventWindowInfo));
if (!pHisWins) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
}
code = copyUpdateResult(&pInfo->pAllUpdated, pHisWins, sessionKeyCompareAsc);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
code = getMaxTsWins(pHisWins, pInfo->historyWins);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
taosArrayDestroy(pHisWins);
}
if (pInfo->destHasPrimaryKey && IS_NORMAL_EVENT_OP(pOperator)) {
code = copyDeleteSessionKey(pInfo->pPkDeleted, pInfo->pSeDeleted);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
}
initGroupResInfoFromArrayList(&pInfo->groupResInfo, pInfo->pUpdated);
pInfo->pUpdated = NULL;
code = blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
SSDataBlock* resBlock = buildEventResult(pOperator);
if (resBlock != NULL) {
@ -733,7 +735,7 @@ void streamEventReloadState(SOperatorInfo* pOperator) {
void* pBuf = NULL;
code = pAggSup->stateStore.streamStateGetInfo(pAggSup->pState, STREAM_EVENT_OP_STATE_NAME,
strlen(STREAM_EVENT_OP_STATE_NAME), &pBuf, &size);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
int32_t num = (size - sizeof(TSKEY)) / sizeof(SSessionKey);
qDebug("===stream=== event window operator reload state. get result count:%d", num);
@ -757,7 +759,7 @@ void streamEventReloadState(SOperatorInfo* pOperator) {
qDebug("===stream=== reload state. try process result %" PRId64 ", %" PRIu64 ", index:%d", pSeKeyBuf[i].win.skey,
pSeKeyBuf[i].groupId, i);
code = getSessionWindowInfoByKey(pAggSup, pSeKeyBuf + i, &curInfo.winInfo);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
// event window has been deleted
if (!IS_VALID_SESSION_WIN(curInfo.winInfo)) {
@ -766,18 +768,18 @@ void streamEventReloadState(SOperatorInfo* pOperator) {
setEventWindowFlag(pAggSup, &curInfo);
if (!curInfo.pWinFlag->startFlag || curInfo.pWinFlag->endFlag) {
code = saveSessionOutputBuf(pAggSup, &curInfo.winInfo);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
continue;
}
code = compactEventWindow(pOperator, &curInfo, pInfo->pSeUpdated, pInfo->pSeDeleted, false);
qDebug("===stream=== reload state. save result %" PRId64 ", %" PRIu64, curInfo.winInfo.sessionWin.win.skey,
curInfo.winInfo.sessionWin.groupId);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
if (IS_VALID_SESSION_WIN(curInfo.winInfo)) {
code = saveSessionOutputBuf(pAggSup, &curInfo.winInfo);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
}
if (!curInfo.pWinFlag->endFlag) {
@ -786,17 +788,17 @@ void streamEventReloadState(SOperatorInfo* pOperator) {
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) {
code = saveResult(curInfo.winInfo, pInfo->pSeUpdated);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
} else if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) {
if (!isCloseWindow(&curInfo.winInfo.sessionWin.win, &pInfo->twAggSup)) {
code = saveDeleteRes(pInfo->pSeDeleted, curInfo.winInfo.sessionWin);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
}
SSessionKey key = {0};
getSessionHashKey(&curInfo.winInfo.sessionWin, &key);
code =
tSimpleHashPut(pAggSup->pResultRows, &key, sizeof(SSessionKey), &curInfo.winInfo, sizeof(SResultWindowInfo));
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
}
}
taosMemoryFree(pBuf);
@ -904,7 +906,7 @@ SOperatorInfo* createStreamEventAggOperatorInfo(SOperatorInfo* downstream, SPhys
if (res == TSDB_CODE_SUCCESS) {
code = doStreamEventDecodeOpState(buff, len, pOperator);
taosMemoryFree(buff);
TSDB_CHECK_CODE(code, lino, _error);
QUERY_CHECK_CODE(code, lino, _error);
}
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamEventAgg, NULL, destroyStreamEventOperatorInfo,
@ -912,16 +914,16 @@ SOperatorInfo* createStreamEventAggOperatorInfo(SOperatorInfo* downstream, SPhys
setOperatorStreamStateFn(pOperator, streamEventReleaseState, streamEventReloadState);
code = initDownStream(downstream, &pInfo->streamAggSup, pOperator->operatorType, pInfo->primaryTsIndex,
&pInfo->twAggSup, &pInfo->basic);
TSDB_CHECK_CODE(code, lino, _error);
QUERY_CHECK_CODE(code, lino, _error);
code = appendDownstream(pOperator, &downstream, 1);
TSDB_CHECK_CODE(code, lino, _error);
QUERY_CHECK_CODE(code, lino, _error);
code = filterInitFromNode((SNode*)pEventNode->pStartCond, &pInfo->pStartCondInfo, 0);
TSDB_CHECK_CODE(code, lino, _error);
QUERY_CHECK_CODE(code, lino, _error);
code = filterInitFromNode((SNode*)pEventNode->pEndCond, &pInfo->pEndCondInfo, 0);
TSDB_CHECK_CODE(code, lino, _error);
QUERY_CHECK_CODE(code, lino, _error);
return pOperator;

View File

@ -472,7 +472,7 @@ static int32_t checkResult(SStreamFillSupporter* pFillSup, TSKEY ts, uint64_t gr
(*pRes) = false;
}
code = tSimpleHashPut(pFillSup->pResMap, &key, sizeof(SWinKey), NULL, 0);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
(*pRes) = true;
_end:
@ -493,7 +493,7 @@ static int32_t buildFillResult(SResultRowData* pResRow, SStreamFillSupporter* pF
uint64_t groupId = pBlock->info.id.groupId;
bool ckRes = true;
code = checkResult(pFillSup, ts, groupId, &ckRes);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
if (pFillSup->hasDelete && !ckRes) {
(*pRes) = true;
@ -512,7 +512,7 @@ static int32_t buildFillResult(SResultRowData* pResRow, SStreamFillSupporter* pF
if (!filled) {
SResultCellData* pCell = getResultCell(pResRow, slotId);
code = setRowCell(pColData, pBlock->info.rows, pCell);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
}
}
pBlock->info.rows++;
@ -540,7 +540,7 @@ static void doStreamFillNormal(SStreamFillSupporter* pFillSup, SStreamFillInfo*
if (inWinRange(&pFillSup->winRange, &st)) {
bool res = true;
code = buildFillResult(pFillInfo->pResRow, pFillSup, pFillInfo->current, pBlock, &res);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
}
pFillInfo->current = taosTimeAdd(pFillInfo->current, pFillSup->interval.sliding, pFillSup->interval.slidingUnit,
pFillSup->interval.precision);
@ -561,7 +561,7 @@ static void doStreamFillLinear(SStreamFillSupporter* pFillSup, SStreamFillInfo*
STimeWindow st = {.skey = pFillInfo->current, .ekey = pFillInfo->current};
bool ckRes = true;
code = checkResult(pFillSup, pFillInfo->current, groupId, &ckRes);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
if ((pFillSup->hasDelete && !ckRes) ||
!inWinRange(&pFillSup->winRange, &st)) {
@ -588,7 +588,7 @@ static void doStreamFillLinear(SStreamFillSupporter* pFillSup, SStreamFillInfo*
bool filled = fillIfWindowPseudoColumn(&tmp, pFillCol, pColData, index);
if (!filled) {
code = setRowCell(pColData, index, pCell);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
}
} else {
if (IS_VAR_DATA_TYPE(type) || type == TSDB_DATA_TYPE_BOOL || pCell->isNull) {
@ -606,7 +606,7 @@ static void doStreamFillLinear(SStreamFillSupporter* pFillSup, SStreamFillInfo*
cur.val = taosMemoryCalloc(1, pCell->bytes);
taosGetLinearInterpolationVal(&cur, pCell->type, &start, pEnd, pCell->type);
code = colDataSetVal(pColData, index, (const char*)cur.val, false);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
destroySPoint(&cur);
}
}
@ -636,13 +636,13 @@ static void doStreamFillRange(SStreamFillInfo* pFillInfo, SStreamFillSupporter*
bool res = false;
if (pFillInfo->needFill == false) {
code = buildFillResult(&pFillSup->cur, pFillSup, pFillSup->cur.key, pRes, &res);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
return;
}
if (pFillInfo->pos == FILL_POS_START) {
code = buildFillResult(&pFillSup->cur, pFillSup, pFillSup->cur.key, pRes, &res);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
if (res) {
pFillInfo->pos = FILL_POS_INVALID;
}
@ -654,7 +654,7 @@ static void doStreamFillRange(SStreamFillInfo* pFillInfo, SStreamFillSupporter*
if (pFillInfo->pos == FILL_POS_MID) {
code = buildFillResult(&pFillSup->cur, pFillSup, pFillSup->cur.key, pRes, &res);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
if (res) {
pFillInfo->pos = FILL_POS_INVALID;
}
@ -671,7 +671,7 @@ static void doStreamFillRange(SStreamFillInfo* pFillInfo, SStreamFillSupporter*
}
if (pFillInfo->pos == FILL_POS_END) {
code = buildFillResult(&pFillSup->cur, pFillSup, pFillSup->cur.key, pRes, &res);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
if (res) {
pFillInfo->pos = FILL_POS_INVALID;
}
@ -731,14 +731,14 @@ static void doStreamFillImpl(SOperatorInfo* pOperator) {
doFillResults(pOperator, pFillSup, pFillInfo, pBlock, tsCol, pInfo->srcRowIndex - 1, pRes);
if (pInfo->pRes->info.rows == pInfo->pRes->info.capacity) {
code = blockDataUpdateTsWindow(pRes, pInfo->primaryTsCol);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
return;
}
pInfo->srcRowIndex++;
}
doFillResults(pOperator, pFillSup, pFillInfo, pBlock, tsCol, pInfo->srcRowIndex - 1, pRes);
code = blockDataUpdateTsWindow(pRes, pInfo->primaryTsCol);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
blockDataCleanup(pInfo->pSrcBlock);
_end:
@ -761,14 +761,14 @@ static int32_t buildDeleteRange(SOperatorInfo* pOp, TSKEY start, TSKEY end, uint
SColumnInfoData* pCalEndCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
SColumnInfoData* pTbNameCol = taosArrayGet(pBlock->pDataBlock, TABLE_NAME_COLUMN_INDEX);
code = colDataSetVal(pStartCol, pBlock->info.rows, (const char*)&start, false);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
code = colDataSetVal(pEndCol, pBlock->info.rows, (const char*)&end, false);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
colDataSetNULL(pUidCol, pBlock->info.rows);
code = colDataSetVal(pGroupCol, pBlock->info.rows, (const char*)&groupId, false);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
colDataSetNULL(pCalStartCol, pBlock->info.rows);
colDataSetNULL(pCalEndCol, pBlock->info.rows);
@ -778,7 +778,7 @@ static int32_t buildDeleteRange(SOperatorInfo* pOp, TSKEY start, TSKEY end, uint
void* tbname = NULL;
int32_t winCode = TSDB_CODE_SUCCESS;
code = pAPI->stateStore.streamStateGetParName(pOp->pTaskInfo->streamInfo.pState, groupId, &tbname, false, &winCode);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
if (winCode != TSDB_CODE_SUCCESS) {
colDataSetNULL(pTableCol, pBlock->info.rows);
} else {
@ -786,7 +786,7 @@ static int32_t buildDeleteRange(SOperatorInfo* pOp, TSKEY start, TSKEY end, uint
char parTbName[VARSTR_HEADER_SIZE + TSDB_TABLE_NAME_LEN];
STR_WITH_MAXSIZE_TO_VARSTR(parTbName, tbname, sizeof(parTbName));
code = colDataSetVal(pTableCol, pBlock->info.rows, (const char*)parTbName, false);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
pAPI->stateStore.streamStateFreeVal(tbname);
}
@ -808,14 +808,14 @@ static int32_t buildDeleteResult(SOperatorInfo* pOperator, TSKEY startTs, TSKEY
if (hasPrevWindow(pFillSup)) {
TSKEY start = getNextWindowTs(pFillSup->prev.key, &pFillSup->interval);
code = buildDeleteRange(pOperator, start, endTs, groupId, delRes);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
} else if (hasNextWindow(pFillSup)) {
TSKEY end = getPrevWindowTs(pFillSup->next.key, &pFillSup->interval);
code = buildDeleteRange(pOperator, startTs, end, groupId, delRes);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
} else {
code = buildDeleteRange(pOperator, startTs, endTs, groupId, delRes);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
}
_end:
@ -836,7 +836,7 @@ static int32_t doDeleteFillResultImpl(SOperatorInfo* pOperator, TSKEY startTs, T
pAPI->stateStore.streamStateFillDel(pOperator->pTaskInfo->streamInfo.pState, &key);
if (!pInfo->pFillInfo->needFill) {
code = buildDeleteResult(pOperator, startTs, endTs, groupId, pInfo->pDelRes);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
} else {
STimeRange tw = {
.skey = startTs,
@ -846,7 +846,7 @@ static int32_t doDeleteFillResultImpl(SOperatorInfo* pOperator, TSKEY startTs, T
void* tmp = taosArrayPush(pInfo->pFillInfo->delRanges, &tw);
if (!tmp) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
}
}
@ -960,7 +960,7 @@ static int32_t doDeleteFillResult(SOperatorInfo* pOperator) {
pAPI->stateStore.streamStateFreeCur(pCur);
code = doDeleteFillResultImpl(pOperator, ts, endTs, groupId);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
}
pFillInfo->current = pFillInfo->end + 1;
@ -988,17 +988,17 @@ static int32_t doApplyStreamScalarCalculation(SOperatorInfo* pOperator, SSDataBl
blockDataCleanup(pDstBlock);
code = blockDataEnsureCapacity(pDstBlock, pSrcBlock->info.rows);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
setInputDataBlock(pSup, pSrcBlock, TSDB_ORDER_ASC, MAIN_SCAN, false);
code = projectApplyFunctions(pSup->pExprInfo, pDstBlock, pSrcBlock, pSup->pCtx, pSup->numOfExprs, NULL);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
pDstBlock->info.rows = 0;
pSup = &pInfo->pFillSup->notFillExprSup;
setInputDataBlock(pSup, pSrcBlock, TSDB_ORDER_ASC, MAIN_SCAN, false);
code = projectApplyFunctions(pSup->pExprInfo, pDstBlock, pSrcBlock, pSup->pCtx, pSup->numOfExprs, NULL);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
pDstBlock->info.id.groupId = pSrcBlock->info.id.groupId;
@ -1076,7 +1076,7 @@ static SSDataBlock* doStreamFill(SOperatorInfo* pOperator) {
blockDataCleanup(pInfo->pDelRes);
pInfo->pFillSup->hasDelete = true;
code = doDeleteFillResult(pOperator);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
if (pInfo->pDelRes->info.rows > 0) {
printDataBlock(pInfo->pDelRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
@ -1088,7 +1088,7 @@ static SSDataBlock* doStreamFill(SOperatorInfo* pOperator) {
case STREAM_INVALID:
case STREAM_PULL_DATA: {
code = doApplyStreamScalarCalculation(pOperator, pBlock, pInfo->pSrcBlock);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
memcpy(pInfo->pSrcBlock->info.parTbName, pBlock->info.parTbName, TSDB_TABLE_NAME_LEN);
pInfo->srcRowIndex = -1;
@ -1104,7 +1104,7 @@ static SSDataBlock* doStreamFill(SOperatorInfo* pOperator) {
doStreamFillImpl(pOperator);
code = doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, &pInfo->matchInfo);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
memcpy(pInfo->pRes->info.parTbName, pInfo->pSrcBlock->info.parTbName, TSDB_TABLE_NAME_LEN);
pOperator->resultInfo.totalRows += pInfo->pRes->info.rows;
@ -1161,7 +1161,7 @@ static SStreamFillSupporter* initStreamFillSup(SStreamFillPhysiNode* pPhyFillNod
SStreamFillSupporter* pFillSup = taosMemoryCalloc(1, sizeof(SStreamFillSupporter));
if (!pFillSup) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
}
pFillSup->numOfFillCols = numOfFillCols;
int32_t numOfNotFillCols = 0;
@ -1174,11 +1174,11 @@ static SStreamFillSupporter* initStreamFillSup(SStreamFillPhysiNode* pPhyFillNod
pFillSup->pAPI = pAPI;
code = initResultBuf(pFillSup);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
SExprInfo* noFillExpr = createExprInfo(pPhyFillNode->pNotFillExprs, NULL, &numOfNotFillCols);
code = initExprSupp(&pFillSup->notFillExprSup, noFillExpr, numOfNotFillCols, &pAPI->functionStore);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
pFillSup->pResMap = tSimpleHashInit(16, hashFn);
@ -1199,7 +1199,7 @@ SStreamFillInfo* initStreamFillInfo(SStreamFillSupporter* pFillSup, SSDataBlock*
SStreamFillInfo* pFillInfo = taosMemoryCalloc(1, sizeof(SStreamFillInfo));
if (!pFillInfo) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
}
pFillInfo->start = INT64_MIN;
@ -1210,7 +1210,7 @@ SStreamFillInfo* initStreamFillInfo(SStreamFillSupporter* pFillSup, SSDataBlock*
pFillInfo->pLinearInfo = taosMemoryCalloc(1, sizeof(SStreamFillLinearInfo));
if (!pFillInfo) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
}
pFillInfo->pLinearInfo->hasNext = false;
@ -1221,13 +1221,13 @@ SStreamFillInfo* initStreamFillInfo(SStreamFillSupporter* pFillSup, SSDataBlock*
pFillInfo->pLinearInfo->pEndPoints = taosArrayInit(pFillSup->numOfAllCols, sizeof(SPoint));
if (!pFillInfo->pLinearInfo->pEndPoints) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
}
pFillInfo->pLinearInfo->pNextEndPoints = taosArrayInit(pFillSup->numOfAllCols, sizeof(SPoint));
if (!pFillInfo->pLinearInfo->pNextEndPoints) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
}
for (int32_t i = 0; i < pFillSup->numOfAllCols; i++) {
@ -1236,25 +1236,25 @@ SStreamFillInfo* initStreamFillInfo(SStreamFillSupporter* pFillSup, SSDataBlock*
value.val = taosMemoryCalloc(1, pColData->info.bytes);
if (!value.val) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
}
void* tmpRes = taosArrayPush(pFillInfo->pLinearInfo->pEndPoints, &value);
if (!tmpRes) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
}
value.val = taosMemoryCalloc(1, pColData->info.bytes);
if (!value.val) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
}
tmpRes = taosArrayPush(pFillInfo->pLinearInfo->pNextEndPoints, &value);
if (!tmpRes) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
}
}
}
@ -1266,14 +1266,14 @@ SStreamFillInfo* initStreamFillInfo(SStreamFillSupporter* pFillSup, SSDataBlock*
pFillInfo->pResRow = taosMemoryCalloc(1, sizeof(SResultRowData));
if (!pFillInfo->pResRow) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
}
pFillInfo->pResRow->key = INT64_MIN;
pFillInfo->pResRow->pRowVal = taosMemoryCalloc(1, pFillSup->rowSize);
if (!pFillInfo->pResRow->pRowVal) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
}
for (int32_t i = 0; i < pFillSup->numOfAllCols; ++i) {
@ -1288,7 +1288,7 @@ SStreamFillInfo* initStreamFillInfo(SStreamFillSupporter* pFillSup, SSDataBlock*
pFillInfo->delRanges = taosArrayInit(16, sizeof(STimeRange));
if (!pFillInfo->delRanges) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
}
pFillInfo->delIndex = 0;
@ -1311,7 +1311,7 @@ SOperatorInfo* createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFi
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _error);
QUERY_CHECK_CODE(code, lino, _error);
}
SInterval* pInterval = &((SStreamIntervalOperatorInfo*)downstream->info)->interval;
@ -1320,17 +1320,17 @@ SOperatorInfo* createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFi
pInfo->pFillSup = initStreamFillSup(pPhyFillNode, pInterval, pFillExprInfo, numOfFillCols, &pTaskInfo->storageAPI);
if (!pInfo->pFillSup) {
code = TSDB_CODE_FAILED;
TSDB_CHECK_CODE(code, lino, _error);
QUERY_CHECK_CODE(code, lino, _error);
}
initResultSizeInfo(&pOperator->resultInfo, 4096);
pInfo->pRes = createDataBlockFromDescNode(pPhyFillNode->node.pOutputDataBlockDesc);
pInfo->pSrcBlock = createDataBlockFromDescNode(pPhyFillNode->node.pOutputDataBlockDesc);
code = blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
TSDB_CHECK_CODE(code, lino, _error);
QUERY_CHECK_CODE(code, lino, _error);
code = blockDataEnsureCapacity(pInfo->pSrcBlock, pOperator->resultInfo.capacity);
TSDB_CHECK_CODE(code, lino, _error);
QUERY_CHECK_CODE(code, lino, _error);
pInfo->pFillInfo = initStreamFillInfo(pInfo->pFillSup, pInfo->pRes);
if (!pInfo->pFillInfo) {
@ -1371,11 +1371,11 @@ SOperatorInfo* createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFi
pInfo->pDelRes = createSpecialDataBlock(STREAM_DELETE_RESULT);
if (!pInfo->pDelRes) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _error);
QUERY_CHECK_CODE(code, lino, _error);
}
code = blockDataEnsureCapacity(pInfo->pDelRes, pOperator->resultInfo.capacity);
TSDB_CHECK_CODE(code, lino, _error);
QUERY_CHECK_CODE(code, lino, _error);
pInfo->primaryTsCol = ((STargetNode*)pPhyFillNode->pWStartTs)->slotId;
pInfo->primarySrcSlotId = ((SColumnNode*)((STargetNode*)pPhyFillNode->pWStartTs)->pExpr)->slotId;
@ -1383,13 +1383,13 @@ SOperatorInfo* createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFi
int32_t numOfOutputCols = 0;
code = extractColMatchInfo(pPhyFillNode->pFillExprs, pPhyFillNode->node.pOutputDataBlockDesc, &numOfOutputCols,
COL_MATCH_FROM_SLOT_ID, &pInfo->matchInfo);
TSDB_CHECK_CODE(code, lino, _error);
QUERY_CHECK_CODE(code, lino, _error);
code = filterInitFromNode((SNode*)pPhyFillNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
TSDB_CHECK_CODE(code, lino, _error);
QUERY_CHECK_CODE(code, lino, _error);
code = initExprSupp(&pOperator->exprSupp, pFillExprInfo, numOfFillCols, &pTaskInfo->storageAPI.functionStore);
TSDB_CHECK_CODE(code, lino, _error);
QUERY_CHECK_CODE(code, lino, _error);
pInfo->srcRowIndex = -1;
setOperatorInfo(pOperator, "StreamFillOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_FILL, false, OP_NOT_OPENED, pInfo,
@ -1399,7 +1399,7 @@ SOperatorInfo* createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFi
setOperatorStreamStateFn(pOperator, streamOpReleaseState, streamOpReloadState);
code = appendDownstream(pOperator, &downstream, 1);
TSDB_CHECK_CODE(code, lino, _error);
QUERY_CHECK_CODE(code, lino, _error);
return pOperator;
_error:

File diff suppressed because it is too large Load Diff

View File

@ -88,14 +88,14 @@ static int32_t addNewSessionWindow(SStreamFileState* pFileState, SArray* pWinInf
SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
if (!pNewPos || !pNewPos->pRowBuff) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
}
memcpy(pNewPos->pKey, pKey, sizeof(SSessionKey));
void* tmp = taosArrayPush(pWinInfos, &pNewPos);
if (!tmp) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
}
(*ppPos) = pNewPos;
@ -113,14 +113,14 @@ static int32_t insertNewSessionWindow(SStreamFileState* pFileState, SArray* pWin
SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
if (!pNewPos || !pNewPos->pRowBuff) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
}
memcpy(pNewPos->pKey, pKey, sizeof(SSessionKey));
void* tmp = taosArrayInsert(pWinInfos, index, &pNewPos);
if (!tmp) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
}
*ppPos = pNewPos;
@ -138,7 +138,7 @@ SRowBuffPos* createSessionWinBuff(SStreamFileState* pFileState, SSessionKey* pKe
SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
if (!pNewPos || !pNewPos->pRowBuff) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
}
memcpy(pNewPos->pKey, pKey, sizeof(SSessionKey));
@ -174,10 +174,10 @@ int32_t getSessionWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey,
pWinStates = taosArrayInit(16, POINTER_BYTES);
if (!pWinStates) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
}
code = tSimpleHashPut(pSessionBuff, &pKey->groupId, sizeof(uint64_t), &pWinStates, POINTER_BYTES);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
}
TSKEY startTs = pKey->win.skey;
@ -192,7 +192,7 @@ int32_t getSessionWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey,
(*pVal) = createSessionWinBuff(pFileState, pKey, p, pVLen);
if (!(*pVal)) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
}
(*pWinCode) = code_file;
@ -201,7 +201,7 @@ int32_t getSessionWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey,
code = addNewSessionWindow(pFileState, pWinStates, pKey, (SRowBuffPos**)pVal);
(*pWinCode) = TSDB_CODE_FAILED;
taosMemoryFree(p);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
}
goto _end;
}
@ -243,7 +243,7 @@ int32_t getSessionWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey,
(*pVal) = createSessionWinBuff(pFileState, pKey, p, pVLen);
if (!(*pVal)) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
}
(*pWinCode) = code_file;
@ -257,14 +257,14 @@ int32_t getSessionWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey,
if (index == size - 1) {
code = addNewSessionWindow(pFileState, pWinStates, pKey, (SRowBuffPos**)pVal);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
(*pWinCode) = TSDB_CODE_FAILED;
goto _end;
}
code = insertNewSessionWindow(pFileState, pWinStates, pKey, index + 1, (SRowBuffPos**)pVal);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
(*pWinCode) = TSDB_CODE_FAILED;
@ -296,11 +296,11 @@ int32_t putSessionWinResultBuff(SStreamFileState* pFileState, SRowBuffPos* pPos)
pWinStates = taosArrayInit(16, POINTER_BYTES);
if (!pWinStates) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
}
code = tSimpleHashPut(pSessionBuff, &pKey->groupId, sizeof(uint64_t), &pWinStates, POINTER_BYTES);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
}
int32_t size = taosArrayGetSize(pWinStates);
@ -308,7 +308,7 @@ int32_t putSessionWinResultBuff(SStreamFileState* pFileState, SRowBuffPos* pPos)
void* tmp = taosArrayPush(pWinStates, &pPos);
if (!tmp) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
}
goto _end;
}
@ -319,13 +319,13 @@ int32_t putSessionWinResultBuff(SStreamFileState* pFileState, SRowBuffPos* pPos)
void* tmp = taosArrayInsert(pWinStates, index, &pPos);
if (!tmp) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
}
} else {
void* tmp = taosArrayInsert(pWinStates, 0, &pPos);
if (!tmp) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
}
}
@ -343,7 +343,7 @@ int32_t getSessionFlushedBuff(SStreamFileState* pFileState, SSessionKey* pKey, v
SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
if (!pNewPos || !pNewPos->pRowBuff) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
}
pNewPos->needFree = true;
pNewPos->beFlushed = true;
@ -416,17 +416,17 @@ int32_t allocSessioncWinBuffByNextPosition(SStreamFileState* pFileState, SStream
pWinStates = taosArrayInit(16, POINTER_BYTES);
if (!pWinStates) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
}
code = tSimpleHashPut(pSessionBuff, &pWinKey->groupId, sizeof(uint64_t), &pWinStates, POINTER_BYTES);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
} else {
pWinStates = (SArray*)(*ppBuff);
}
if (!pCur) {
code = addNewSessionWindow(pFileState, pWinStates, pWinKey, &pNewPos);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
goto _end;
}
@ -435,12 +435,12 @@ int32_t allocSessioncWinBuffByNextPosition(SStreamFileState* pFileState, SStream
if (pCur->buffIndex >= 0) {
if (pCur->buffIndex >= size) {
code = addNewSessionWindow(pFileState, pWinStates, pWinKey, &pNewPos);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
goto _end;
}
code = insertNewSessionWindow(pFileState, pWinStates, pWinKey, pCur->buffIndex, &pNewPos);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
goto _end;
} else {
@ -452,14 +452,14 @@ int32_t allocSessioncWinBuffByNextPosition(SStreamFileState* pFileState, SStream
int32_t winCode = TSDB_CODE_SUCCESS;
code = getSessionWinResultBuff(pFileState, &pTmpKey, 0, (void**)&pNewPos, pVLen, &winCode);
ASSERT(winCode == TSDB_CODE_FAILED);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
goto _end;
}
}
pNewPos = getNewRowPosForWrite(pFileState);
if (!pNewPos || !pNewPos->pRowBuff) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
}
memcpy(pNewPos->pKey, pWinKey, sizeof(SSessionKey));
@ -793,11 +793,11 @@ int32_t getStateWinResultBuff(SStreamFileState* pFileState, SSessionKey* key, ch
pWinStates = taosArrayInit(16, POINTER_BYTES);
if (!pWinStates) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
}
code = tSimpleHashPut(pSessionBuff, &pWinKey->groupId, sizeof(uint64_t), &pWinStates, POINTER_BYTES);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
}
TSKEY startTs = pWinKey->win.skey;
@ -812,7 +812,7 @@ int32_t getStateWinResultBuff(SStreamFileState* pFileState, SSessionKey* key, ch
(*pVal) = createSessionWinBuff(pFileState, pWinKey, p, pVLen);
if (!(*pVal)) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
}
(*pWinCode) = code_file;
@ -822,7 +822,7 @@ int32_t getStateWinResultBuff(SStreamFileState* pFileState, SSessionKey* key, ch
code = addNewSessionWindow(pFileState, pWinStates, key, (SRowBuffPos**)pVal);
(*pWinCode) = TSDB_CODE_FAILED;
taosMemoryFree(p);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
}
goto _end;
}
@ -869,7 +869,7 @@ int32_t getStateWinResultBuff(SStreamFileState* pFileState, SSessionKey* key, ch
(*pVal) = createSessionWinBuff(pFileState, pWinKey, p, pVLen);
if (!(*pVal)) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
}
(*pWinCode) = code_file;
@ -884,13 +884,13 @@ int32_t getStateWinResultBuff(SStreamFileState* pFileState, SSessionKey* key, ch
if (index == size - 1) {
code = addNewSessionWindow(pFileState, pWinStates, key, (SRowBuffPos**)pVal);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
(*pWinCode) = TSDB_CODE_FAILED;
goto _end;
}
code = insertNewSessionWindow(pFileState, pWinStates, key, index + 1, (SRowBuffPos**)pVal);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
(*pWinCode) = TSDB_CODE_FAILED;
@ -933,11 +933,11 @@ int32_t getCountWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, C
pWinStates = taosArrayInit(16, POINTER_BYTES);
if (!pWinStates) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
}
code = tSimpleHashPut(pSessionBuff, &pWinKey->groupId, sizeof(uint64_t), &pWinStates, POINTER_BYTES);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
}
TSKEY startTs = pWinKey->win.skey;
@ -958,7 +958,7 @@ int32_t getCountWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, C
(*pVal) = createSessionWinBuff(pFileState, pWinKey, pRockVal, pVLen);
if (!(*pVal)) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
}
goto _end;
@ -970,11 +970,11 @@ int32_t getCountWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, C
taosMemoryFree(pRockVal);
if (!(*pVal)) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
}
} else {
code = addNewSessionWindow(pFileState, pWinStates, pWinKey, (SRowBuffPos**)pVal);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
(*pWinCount) = TSDB_CODE_FAILED;
}
@ -1013,7 +1013,7 @@ int32_t getCountWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, C
(*pVal) = createSessionWinBuff(pFileState, pWinKey, pRockVal, pVLen);
if (!(*pVal)) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
}
(*pWinCount) = code_file;
@ -1037,7 +1037,7 @@ int32_t getCountWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, C
}
code = addNewSessionWindow(pFileState, pWinStates, pWinKey, (SRowBuffPos**)pVal);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
(*pWinCount) = TSDB_CODE_FAILED;
@ -1061,7 +1061,7 @@ int32_t createCountWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey
} else {
pWinStates = taosArrayInit(16, POINTER_BYTES);
code = tSimpleHashPut(pSessionBuff, &pWinKey->groupId, sizeof(uint64_t), &pWinStates, POINTER_BYTES);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
}
TSKEY startTs = pWinKey->win.skey;
@ -1077,7 +1077,7 @@ int32_t createCountWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey
(*pVal) = createSessionWinBuff(pFileState, pWinKey, p, pVLen);
if (!(*pVal)) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
}
qDebug("===stream===0 get state win:%" PRId64 ",%" PRId64 " from disc, res %d", pWinKey->win.skey,
@ -1085,11 +1085,11 @@ int32_t createCountWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey
} else {
code = addNewSessionWindow(pFileState, pWinStates, pWinKey, (SRowBuffPos**)pVal);
taosMemoryFree(p);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
}
} else {
code = addNewSessionWindow(pFileState, pWinStates, pWinKey, (SRowBuffPos**)pVal);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
}
_end:

View File

@ -105,14 +105,14 @@ SStreamState* streamStateOpen(const char* path, void* pTask, int64_t streamId, i
stDebug("open stream state %p, %s", pState, path);
if (pState == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
}
pState->pTdbState = taosMemoryCalloc(1, sizeof(STdbState));
if (pState->pTdbState == NULL) {
streamStateDestroy(pState, true);
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
}
SStreamTask* pStreamTask = pTask;
@ -121,7 +121,7 @@ SStreamState* streamStateOpen(const char* path, void* pTask, int64_t streamId, i
sprintf(pState->pTdbState->idstr, "0x%" PRIx64 "-0x%x", pState->streamId, pState->taskId);
code = streamTaskSetDb(pStreamTask->pMeta, pTask, pState->pTdbState->idstr);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
SStreamMeta* pMeta = pStreamTask->pMeta;
pState->pTdbState->pOwner = pTask;
@ -130,7 +130,7 @@ SStreamState* streamStateOpen(const char* path, void* pTask, int64_t streamId, i
pState->parNameMap = tSimpleHashInit(1024, hashFn);
if (!pState->parNameMap) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
}
stInfo("open state %p on backend %p 0x%" PRIx64 "-%d succ", pState, pMeta->streamBackend, pState->streamId,
pState->taskId);
@ -169,7 +169,7 @@ int32_t streamStateFuncPut(SStreamState* pState, const SWinKey* key, const void*
void* pVal = NULL;
int32_t len = getRowStateRowSize(pState->pFileState);
code = getFunctionRowBuff(pState->pFileState, (void*)key, sizeof(SWinKey), &pVal, &len);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
char* buf = ((SRowBuffPos*)pVal)->pRowBuff;
uint32_t rowSize = streamFileStateGetSelectRowSize(pState->pFileState);
@ -187,7 +187,7 @@ int32_t streamStateFuncGet(SStreamState* pState, const SWinKey* key, void** ppVa
void* pVal = NULL;
int32_t len = getRowStateRowSize(pState->pFileState);
code = getFunctionRowBuff(pState->pFileState, (void*)key, sizeof(SWinKey), (void**)(&pVal), &len);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
char* buf = ((SRowBuffPos*)pVal)->pRowBuff;
uint32_t rowSize = streamFileStateGetSelectRowSize(pState->pFileState);
@ -256,10 +256,10 @@ void streamStateSaveInfo(SStreamState* pState, void* pKey, int32_t keyLen, void*
char* cfName = "default";
void* batch = streamStateCreateBatch();
code = streamStatePutBatch(pState, cfName, batch, pKey, pVal, vLen, 0);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
code = streamStatePutBatch_rocksdb(pState, batch);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
_end:
if (code != TSDB_CODE_SUCCESS) {
@ -361,18 +361,18 @@ int32_t streamStateSessionPut(SStreamState* pState, const SSessionKey* key, void
goto _end;
}
code = streamStateSessionPut_rocksdb(pState, key, pos->pRowBuff, vLen);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
streamStateReleaseBuf(pState, pos, true);
code = putFreeBuff(pState->pFileState, pos);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
stDebug("===stream===save skey:%" PRId64 ", ekey:%" PRId64 ", groupId:%" PRIu64 ".code:%d", key->win.skey,
key->win.ekey, key->groupId, code);
} else {
pos->beFlushed = false;
code = putSessionWinResultBuff(pState->pFileState, value);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
}
}
@ -453,10 +453,10 @@ int32_t streamStatePutParName(SStreamState* pState, int64_t groupId, const char
if (tSimpleHashGet(pState->parNameMap, &groupId, sizeof(int64_t)) == NULL) {
if (tSimpleHashGetSize(pState->parNameMap) < MAX_TABLE_NAME_NUM) {
code = tSimpleHashPut(pState->parNameMap, &groupId, sizeof(int64_t), tbname, TSDB_TABLE_NAME_LEN);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
}
code = streamStatePutParName_rocksdb(pState, groupId, tbname);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
}
_end:
@ -477,14 +477,14 @@ int32_t streamStateGetParName(SStreamState* pState, int64_t groupId, void** pVal
(*pWinCode) = streamStateGetParName_rocksdb(pState, groupId, pVal);
if ((*pWinCode) == TSDB_CODE_SUCCESS && tSimpleHashGetSize(pState->parNameMap) < MAX_TABLE_NAME_NUM) {
code = tSimpleHashPut(pState->parNameMap, &groupId, sizeof(int64_t), *pVal, TSDB_TABLE_NAME_LEN);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
}
goto _end;
}
*pVal = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN);
if (!(*pVal)) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
}
memcpy(*pVal, pStr, TSDB_TABLE_NAME_LEN);

View File

@ -78,11 +78,11 @@ int32_t windowSBfAdd(SUpdateInfo* pInfo, uint64_t count) {
int64_t rows = adjustExpEntries(pInfo->interval * ROWS_PER_MILLISECOND);
SScalableBf* tsSBF = NULL;
code = tScalableBfInit(rows, DEFAULT_FALSE_POSITIVE, &tsSBF);
TSDB_CHECK_CODE(code, lino, _error);
QUERY_CHECK_CODE(code, lino, _error);
void* res = taosArrayPush(pInfo->pTsSBFs, &tsSBF);
if (!res) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _error);
QUERY_CHECK_CODE(code, lino, _error);
}
}
@ -152,7 +152,7 @@ int32_t updateInfoInit(int64_t interval, int32_t precision, int64_t watermark, b
SUpdateInfo* pInfo = taosMemoryCalloc(1, sizeof(SUpdateInfo));
if (pInfo == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
}
pInfo->pTsBuckets = NULL;
pInfo->pTsSBFs = NULL;
@ -170,16 +170,16 @@ int32_t updateInfoInit(int64_t interval, int32_t precision, int64_t watermark, b
if (pInfo->pTsSBFs == NULL) {
updateInfoDestroy(pInfo);
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
}
code = windowSBfAdd(pInfo, bfSize);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
pInfo->pTsBuckets = taosArrayInit(DEFAULT_BUCKET_SIZE, sizeof(TSKEY));
if (pInfo->pTsBuckets == NULL) {
updateInfoDestroy(pInfo);
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
}
TSKEY dumy = 0;
@ -187,7 +187,7 @@ int32_t updateInfoInit(int64_t interval, int32_t precision, int64_t watermark, b
void* tmp = taosArrayPush(pInfo->pTsBuckets, &dumy);
if (!tmp) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
}
}
pInfo->numBuckets = DEFAULT_BUCKET_SIZE;
@ -197,7 +197,7 @@ int32_t updateInfoInit(int64_t interval, int32_t precision, int64_t watermark, b
pInfo->pMap = taosHashInit(DEFAULT_MAP_CAPACITY, hashFn, true, HASH_NO_LOCK);
if (!pInfo->pMap) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
}
pInfo->maxDataVersion = 0;
pInfo->pkColLen = pkLen;
@ -205,12 +205,12 @@ int32_t updateInfoInit(int64_t interval, int32_t precision, int64_t watermark, b
pInfo->pKeyBuff = taosMemoryCalloc(1, sizeof(TSKEY) + sizeof(int64_t) + pkLen);
if (!pInfo->pKeyBuff) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
}
pInfo->pValueBuff = taosMemoryCalloc(1, sizeof(TSKEY) + pkLen);
if (!pInfo->pValueBuff) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
}
if (pkLen != 0) {
pInfo->comparePkRowFn = compareKeyTsAndPk;
@ -234,7 +234,7 @@ static int32_t getSBf(SUpdateInfo* pInfo, TSKEY ts, SScalableBf** ppSBf) {
int32_t lino = 0;
if (ts <= 0) {
code = TSDB_CODE_FAILED;
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
}
if (pInfo->minTS < 0) {
pInfo->minTS = (TSKEY)(ts / pInfo->interval * pInfo->interval);
@ -242,13 +242,13 @@ static int32_t getSBf(SUpdateInfo* pInfo, TSKEY ts, SScalableBf** ppSBf) {
int64_t index = (int64_t)((ts - pInfo->minTS) / pInfo->interval);
if (index < 0) {
code = TSDB_CODE_FAILED;
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
}
if (index >= pInfo->numSBFs) {
uint64_t count = index + 1 - pInfo->numSBFs;
windowSBfDelete(pInfo, count);
code = windowSBfAdd(pInfo, count);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
index = pInfo->numSBFs - 1;
}
@ -256,12 +256,12 @@ static int32_t getSBf(SUpdateInfo* pInfo, TSKEY ts, SScalableBf** ppSBf) {
if (res == NULL) {
int64_t rows = adjustExpEntries(pInfo->interval * ROWS_PER_MILLISECOND);
code = tScalableBfInit(rows, DEFAULT_FALSE_POSITIVE, &res);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
void* tmp = taosArrayPush(pInfo->pTsSBFs, &res);
if (!tmp) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
}
}
(*ppSBf) = res;
@ -311,7 +311,7 @@ int32_t updateInfoFillBlockData(SUpdateInfo* pInfo, SSDataBlock* pBlock, int32_t
}
SScalableBf* pSBf = NULL;
code = getSBf(pInfo, ts, &pSBf);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
if (pSBf) {
if (primaryKeyCol >= 0) {
@ -322,14 +322,14 @@ int32_t updateInfoFillBlockData(SUpdateInfo* pInfo, SSDataBlock* pBlock, int32_t
// we don't care whether the data is updated or not
int32_t winRes = 0;
code = tScalableBfPut(pSBf, pInfo->pKeyBuff, buffLen, &winRes);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
}
}
void* pMaxTs = taosHashGet(pInfo->pMap, &tbUid, sizeof(int64_t));
if (pMaxTs == NULL || pInfo->comparePkRowFn(pMaxTs, &maxTs, pMaxPkVal, pInfo->comparePkCol) == -1) {
int32_t valueLen = getValueBuff(maxTs, pMaxPkVal, maxLen, pInfo->pValueBuff);
code = taosHashPut(pInfo->pMap, &tbUid, sizeof(int64_t), pInfo->pValueBuff, valueLen);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
}
(*pMaxResTs) = maxTs;
@ -354,7 +354,7 @@ bool updateInfoIsUpdated(SUpdateInfo* pInfo, uint64_t tableId, TSKEY ts, void* p
// this window has been closed.
if (pInfo->pCloseWinSBF) {
code = tScalableBfPut(pInfo->pCloseWinSBF, pInfo->pKeyBuff, buffLen, &res);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
if (res == TSDB_CODE_SUCCESS) {
return false;
} else {
@ -366,14 +366,14 @@ bool updateInfoIsUpdated(SUpdateInfo* pInfo, uint64_t tableId, TSKEY ts, void* p
SScalableBf* pSBf = NULL;
code = getSBf(pInfo, ts, &pSBf);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
int32_t size = taosHashGetSize(pInfo->pMap);
if ((!pMapMaxTs && size < DEFAULT_MAP_SIZE) ||
(pMapMaxTs && pInfo->comparePkRowFn(pMapMaxTs, &ts, pPkVal, pInfo->comparePkCol) == -1)) {
int32_t valueLen = getValueBuff(ts, pPkVal, len, pInfo->pValueBuff);
code = taosHashPut(pInfo->pMap, &tableId, sizeof(uint64_t), pInfo->pValueBuff, valueLen);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
// pSBf may be a null pointer
if (pSBf) {
@ -385,7 +385,7 @@ bool updateInfoIsUpdated(SUpdateInfo* pInfo, uint64_t tableId, TSKEY ts, void* p
// pSBf may be a null pointer
if (pSBf) {
code = tScalableBfPut(pSBf, pInfo->pKeyBuff, buffLen, &res);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
}
if (!pMapMaxTs && maxTs < ts) {
@ -458,67 +458,67 @@ int32_t updateInfoSerialize(void* buf, int32_t bufLen, const SUpdateInfo* pInfo,
tEncoderInit(&encoder, buf, bufLen);
if (tStartEncode(&encoder) < 0) {
code = TSDB_CODE_FAILED;
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
}
int32_t size = taosArrayGetSize(pInfo->pTsBuckets);
if (tEncodeI32(&encoder, size) < 0) {
code = TSDB_CODE_FAILED;
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
}
for (int32_t i = 0; i < size; i++) {
TSKEY* pTs = (TSKEY*)taosArrayGet(pInfo->pTsBuckets, i);
if (tEncodeI64(&encoder, *pTs) < 0) {
code = TSDB_CODE_FAILED;
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
}
}
if (tEncodeU64(&encoder, pInfo->numBuckets) < 0) {
code = TSDB_CODE_FAILED;
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
}
int32_t sBfSize = taosArrayGetSize(pInfo->pTsSBFs);
if (tEncodeI32(&encoder, sBfSize) < 0) {
code = TSDB_CODE_FAILED;
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
}
for (int32_t i = 0; i < sBfSize; i++) {
SScalableBf* pSBf = taosArrayGetP(pInfo->pTsSBFs, i);
if (tScalableBfEncode(pSBf, &encoder) < 0) {
code = TSDB_CODE_FAILED;
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
}
}
if (tEncodeU64(&encoder, pInfo->numSBFs) < 0) {
code = TSDB_CODE_FAILED;
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
}
if (tEncodeI64(&encoder, pInfo->interval) < 0) {
code = TSDB_CODE_FAILED;
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
}
if (tEncodeI64(&encoder, pInfo->watermark) < 0) {
code = TSDB_CODE_FAILED;
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
}
if (tEncodeI64(&encoder, pInfo->minTS) < 0) {
code = TSDB_CODE_FAILED;
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
}
if (tScalableBfEncode(pInfo->pCloseWinSBF, &encoder) < 0) {
code = TSDB_CODE_FAILED;
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
}
int32_t mapSize = taosHashGetSize(pInfo->pMap);
if (tEncodeI32(&encoder, mapSize) < 0) {
code = TSDB_CODE_FAILED;
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
}
void* pIte = NULL;
size_t keyLen = 0;
@ -526,27 +526,27 @@ int32_t updateInfoSerialize(void* buf, int32_t bufLen, const SUpdateInfo* pInfo,
void* key = taosHashGetKey(pIte, &keyLen);
if (tEncodeU64(&encoder, *(uint64_t*)key) < 0) {
code = TSDB_CODE_FAILED;
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
}
int32_t valueSize = taosHashGetValueSize(pIte);
if (tEncodeBinary(&encoder, (const uint8_t*)pIte, valueSize) < 0) {
code = TSDB_CODE_FAILED;
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
}
}
if (tEncodeU64(&encoder, pInfo->maxDataVersion) < 0) {
code = TSDB_CODE_FAILED;
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
}
if (tEncodeI32(&encoder, pInfo->pkColLen) < 0) {
code = TSDB_CODE_FAILED;
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
}
if (tEncodeI8(&encoder, pInfo->pkColType) < 0) {
code = TSDB_CODE_FAILED;
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
}
tEndEncode(&encoder);
@ -579,7 +579,7 @@ int32_t updateInfoDeserialize(void* buf, int32_t bufLen, SUpdateInfo* pInfo) {
void* tmp = taosArrayPush(pInfo->pTsBuckets, &ts);
if (!tmp) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _error);
QUERY_CHECK_CODE(code, lino, _error);
}
}
@ -591,12 +591,12 @@ int32_t updateInfoDeserialize(void* buf, int32_t bufLen, SUpdateInfo* pInfo) {
for (int32_t i = 0; i < sBfSize; i++) {
SScalableBf* pSBf = NULL;
code = tScalableBfDecode(&decoder, &pSBf);
TSDB_CHECK_CODE(code, lino, _error);
QUERY_CHECK_CODE(code, lino, _error);
void* tmp = taosArrayPush(pInfo->pTsSBFs, &pSBf);
if (!tmp) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _error);
QUERY_CHECK_CODE(code, lino, _error);
}
}
@ -622,7 +622,7 @@ int32_t updateInfoDeserialize(void* buf, int32_t bufLen, SUpdateInfo* pInfo) {
if (tDecodeU64(&decoder, &uid) < 0) return -1;
if (tDecodeBinary(&decoder, (uint8_t**)&pVal, &valSize) < 0) return -1;
code = taosHashPut(pInfo->pMap, &uid, sizeof(uint64_t), pVal, valSize);
TSDB_CHECK_CODE(code, lino, _error);
QUERY_CHECK_CODE(code, lino, _error);
}
ASSERT(mapSize == taosHashGetSize(pInfo->pMap));
if (tDecodeU64(&decoder, &pInfo->maxDataVersion) < 0) return -1;
@ -662,7 +662,7 @@ bool isIncrementalTimeStamp(SUpdateInfo* pInfo, uint64_t tableId, TSKEY ts, void
} else {
int32_t valueLen = getValueBuff(ts, pPkVal, len, pInfo->pValueBuff);
code = taosHashPut(pInfo->pMap, &tableId, sizeof(uint64_t), pInfo->pValueBuff, valueLen);
TSDB_CHECK_CODE(code, lino, _error);
QUERY_CHECK_CODE(code, lino, _error);
}
return res;

View File

@ -261,7 +261,7 @@ int32_t putFreeBuff(SStreamFileState* pFileState, SRowBuffPos* pPos) {
int32_t lino = 0;
if (pPos->pRowBuff) {
code = tdListAppend(pFileState->freeBuffs, &(pPos->pRowBuff));
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
pPos->pRowBuff = NULL;
}
@ -283,7 +283,7 @@ void clearExpiredRowBuff(SStreamFileState* pFileState, TSKEY ts, bool all) {
SRowBuffPos* pPos = *(SRowBuffPos**)(pNode->data);
if (all || (pFileState->getTs(pPos->pKey) < ts && !pPos->beUsed)) {
code = putFreeBuff(pFileState, pPos);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
if (!all) {
pFileState->stateBuffRemoveByPosFn(pFileState, pPos);
@ -312,7 +312,7 @@ int32_t clearFlushedRowBuff(SStreamFileState* pFileState, SStreamSnapshot* pFlus
SRowBuffPos* pPos = *(SRowBuffPos**)pNode->data;
if (isFlushedState(pFileState, pFileState->getTs(pPos->pKey), 0) && !pPos->beUsed) {
code = tdListAppend(pFlushList, &pPos);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
pFileState->flushMark = TMAX(pFileState->flushMark, pFileState->getTs(pPos->pKey));
pFileState->stateBuffRemoveByPosFn(pFileState, pPos);
@ -358,7 +358,7 @@ int32_t popUsedBuffs(SStreamFileState* pFileState, SStreamSnapshot* pFlushList,
continue;
}
code = tdListAppend(pFlushList, &pPos);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
pFileState->flushMark = TMAX(pFileState->flushMark, pFileState->getTs(pPos->pKey));
pFileState->stateBuffRemoveByPosFn(pFileState, pPos);
@ -385,21 +385,21 @@ int32_t flushRowBuff(SStreamFileState* pFileState) {
SStreamSnapshot* pFlushList = tdListNew(POINTER_BYTES);
if (!pFlushList) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
}
uint64_t num = (uint64_t)(pFileState->curRowCount * FLUSH_RATIO);
num = TMAX(num, FLUSH_NUM);
code = clearFlushedRowBuff(pFileState, pFlushList, num);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
if (isListEmpty(pFlushList)) {
code = popUsedBuffs(pFileState, pFlushList, num, false);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
if (isListEmpty(pFlushList)) {
code = popUsedBuffs(pFileState, pFlushList, num, true);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
}
}
@ -411,7 +411,7 @@ int32_t flushRowBuff(SStreamFileState* pFileState) {
while ((pNode = tdListNext(&fIter)) != NULL) {
SRowBuffPos* pPos = *(SRowBuffPos**)pNode->data;
code = putFreeBuff(pFileState, pPos);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
}
tdListFreeP(pFlushList, destroyRowBuffPosPtr);
@ -456,13 +456,13 @@ SRowBuffPos* getNewRowPos(SStreamFileState* pFileState) {
SRowBuffPos* pPos = taosMemoryCalloc(1, sizeof(SRowBuffPos));
if (!pPos) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _error);
QUERY_CHECK_CODE(code, lino, _error);
}
pPos->pKey = taosMemoryCalloc(1, pFileState->keyLen);
if (!pPos->pKey) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _error);
QUERY_CHECK_CODE(code, lino, _error);
}
void* pBuff = getFreeBuff(pFileState);
@ -481,13 +481,13 @@ SRowBuffPos* getNewRowPos(SStreamFileState* pFileState) {
}
code = clearRowBuff(pFileState);
TSDB_CHECK_CODE(code, lino, _error);
QUERY_CHECK_CODE(code, lino, _error);
pPos->pRowBuff = getFreeBuff(pFileState);
_end:
code = tdListAppend(pFileState->usedBuffs, &pPos);
TSDB_CHECK_CODE(code, lino, _error);
QUERY_CHECK_CODE(code, lino, _error);
_error:
if (code != TSDB_CODE_SUCCESS) {
@ -505,7 +505,7 @@ SRowBuffPos* getNewRowPosForWrite(SStreamFileState* pFileState) {
SRowBuffPos* newPos = getNewRowPos(pFileState);
if (!newPos) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _error);
QUERY_CHECK_CODE(code, lino, _error);
}
newPos->beUsed = true;
newPos->beFlushed = false;
@ -537,7 +537,7 @@ int32_t getRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, voi
SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
if (!pNewPos || !pNewPos->pRowBuff) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
}
memcpy(pNewPos->pKey, pKey, keyLen);
@ -556,7 +556,7 @@ int32_t getRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, voi
}
code = tSimpleHashPut(pFileState->rowStateBuff, pKey, keyLen, &pNewPos, POINTER_BYTES);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
if (pVal) {
*pVLen = pFileState->rowSize;
@ -592,7 +592,7 @@ static int32_t recoverSessionRowBuff(SStreamFileState* pFileState, SRowBuffPos*
int32_t len = 0;
void* pBuff = NULL;
code = pFileState->stateFileGetFn(pFileState, pPos->pKey, &pBuff, &len);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
memcpy(pPos->pRowBuff, pBuff, len);
taosMemoryFree(pBuff);
@ -609,7 +609,7 @@ int32_t getRowBuffByPos(SStreamFileState* pFileState, SRowBuffPos* pPos, void**
if (pPos->pRowBuff) {
if (pPos->needFree) {
code = recoverSessionRowBuff(pFileState, pPos);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
}
(*pVal) = pPos->pRowBuff;
goto _end;
@ -621,19 +621,19 @@ int32_t getRowBuffByPos(SStreamFileState* pFileState, SRowBuffPos* pPos, void**
pPos->pRowBuff = taosMemoryCalloc(1, pFileState->rowSize);
if (!pPos->pRowBuff) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
}
pFileState->curRowCount++;
} else {
code = clearRowBuff(pFileState);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
pPos->pRowBuff = getFreeBuff(pFileState);
}
ASSERT(pPos->pRowBuff);
}
code = recoverSessionRowBuff(pFileState, pPos);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
(*pVal) = pPos->pRowBuff;
if (!pPos->needFree) {
@ -680,13 +680,13 @@ void flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot, boo
char* buf = taosMemoryCalloc(1, len);
if (!buf) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
}
void* batch = streamStateCreateBatch();
if (!batch) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
}
while ((pNode = tdListNext(&iter)) != NULL && code == TSDB_CODE_SUCCESS) {
@ -701,14 +701,14 @@ void flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot, boo
if (streamStateGetBatchSize(batch) >= BATCH_LIMIT) {
code = streamStatePutBatch_rocksdb(pFileState->pFileStore, batch);
streamStateClearBatch(batch);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
}
void* pSKey = pFileState->stateBuffCreateStateKeyFn(pPos, ((SStreamState*)pFileState->pFileStore)->number);
code = streamStatePutBatchOptimize(pFileState->pFileStore, idx, batch, pSKey, pPos->pRowBuff, pFileState->rowSize,
0, buf);
taosMemoryFreeClear(pSKey);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
// todo handle failure
memset(buf, 0, len);
}
@ -717,7 +717,7 @@ void flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot, boo
int32_t numOfElems = streamStateGetBatchSize(batch);
if (numOfElems > 0) {
code = streamStatePutBatch_rocksdb(pFileState->pFileStore, batch);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
} else {
goto _end;
}
@ -735,10 +735,10 @@ void flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot, boo
qDebug("===stream===flushMark write:%" PRId64, pFileState->flushMark);
code = streamStatePutBatch(pFileState->pFileStore, "default", batch, STREAM_STATE_INFO_NAME, valBuf, len, 0);
taosMemoryFree(valBuf);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
code = streamStatePutBatch_rocksdb(pFileState->pFileStore, batch);
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
}
_end:
@ -858,7 +858,7 @@ void recoverSnapshot(SStreamFileState* pFileState, int64_t ckId) {
SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
if (!pNewPos || !pNewPos->pRowBuff) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
}
code = streamStateGetKVByCur_rocksdb(pCur, pNewPos->pKey, (const void**)&pVal, &vlen);

View File

@ -40,12 +40,12 @@ int32_t tBloomFilterInit(uint64_t expectedEntries, double errorRate, SBloomFilte
int32_t lino = 0;
if (expectedEntries < 1 || errorRate <= 0 || errorRate >= 1.0) {
code = TSDB_CODE_FAILED;
TSDB_CHECK_CODE(code, lino, _error);
QUERY_CHECK_CODE(code, lino, _error);
}
SBloomFilter* pBF = taosMemoryCalloc(1, sizeof(SBloomFilter));
if (pBF == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _error);
QUERY_CHECK_CODE(code, lino, _error);
}
pBF->expectedEntries = expectedEntries;
pBF->errorRate = errorRate;
@ -66,7 +66,7 @@ int32_t tBloomFilterInit(uint64_t expectedEntries, double errorRate, SBloomFilte
if (pBF->buffer == NULL) {
tBloomFilterDestroy(pBF);
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _error);
QUERY_CHECK_CODE(code, lino, _error);
}
(*ppBF) = pBF;
@ -150,40 +150,40 @@ int32_t tBloomFilterDecode(SDecoder* pDecoder, SBloomFilter** ppBF) {
SBloomFilter* pBF = taosMemoryCalloc(1, sizeof(SBloomFilter));
if (!pBF) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _error);
QUERY_CHECK_CODE(code, lino, _error);
}
pBF->buffer = NULL;
if (tDecodeU32(pDecoder, &pBF->hashFunctions) < 0) {
code = TSDB_CODE_FAILED;
TSDB_CHECK_CODE(code, lino, _error);
QUERY_CHECK_CODE(code, lino, _error);
}
if (tDecodeU64(pDecoder, &pBF->expectedEntries) < 0) {
code = TSDB_CODE_FAILED;
TSDB_CHECK_CODE(code, lino, _error);
QUERY_CHECK_CODE(code, lino, _error);
}
if (tDecodeU64(pDecoder, &pBF->numUnits) < 0) {
code = TSDB_CODE_FAILED;
TSDB_CHECK_CODE(code, lino, _error);
QUERY_CHECK_CODE(code, lino, _error);
}
if (tDecodeU64(pDecoder, &pBF->numBits) < 0) {
code = TSDB_CODE_FAILED;
TSDB_CHECK_CODE(code, lino, _error);
QUERY_CHECK_CODE(code, lino, _error);
}
if (tDecodeU64(pDecoder, &pBF->size) < 0) {
code = TSDB_CODE_FAILED;
TSDB_CHECK_CODE(code, lino, _error);
QUERY_CHECK_CODE(code, lino, _error);
}
pBF->buffer = taosMemoryCalloc(pBF->numUnits, sizeof(uint64_t));
for (int32_t i = 0; i < pBF->numUnits; i++) {
uint64_t* pUnits = (uint64_t*)pBF->buffer;
if (tDecodeU64(pDecoder, pUnits + i) < 0) {
code = TSDB_CODE_FAILED;
TSDB_CHECK_CODE(code, lino, _error);
QUERY_CHECK_CODE(code, lino, _error);
}
}
if (tDecodeDouble(pDecoder, &pBF->errorRate) < 0) {
code = TSDB_CODE_FAILED;
TSDB_CHECK_CODE(code, lino, _error);
QUERY_CHECK_CODE(code, lino, _error);
}
pBF->hashFn1 = HASH_FUNCTION_1;
pBF->hashFn2 = HASH_FUNCTION_2;

View File

@ -34,12 +34,12 @@ int32_t tScalableBfInit(uint64_t expectedEntries, double errorRate, SScalableBf*
const uint32_t defaultSize = 8;
if (expectedEntries < 1 || errorRate <= 0 || errorRate >= 1.0) {
code = TSDB_CODE_FAILED;
TSDB_CHECK_CODE(code, lino, _error);
QUERY_CHECK_CODE(code, lino, _error);
}
SScalableBf* pSBf = taosMemoryCalloc(1, sizeof(SScalableBf));
if (pSBf == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _error);
QUERY_CHECK_CODE(code, lino, _error);
}
pSBf->maxBloomFilters = DEFAULT_MAX_BLOOMFILTERS;
pSBf->status = SBF_VALID;
@ -47,14 +47,14 @@ int32_t tScalableBfInit(uint64_t expectedEntries, double errorRate, SScalableBf*
pSBf->bfArray = taosArrayInit(defaultSize, sizeof(void*));
if (!pSBf->bfArray) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _error);
QUERY_CHECK_CODE(code, lino, _error);
}
SBloomFilter* pNormalBf = NULL;
code = tScalableBfAddFilter(pSBf, expectedEntries, errorRate * DEFAULT_TIGHTENING_RATIO, &pNormalBf);
if (code != TSDB_CODE_SUCCESS) {
tScalableBfDestroy(pSBf);
TSDB_CHECK_CODE(code, lino, _error);
QUERY_CHECK_CODE(code, lino, _error);
}
pSBf->growth = DEFAULT_GROWTH;
pSBf->hashFn1 = HASH_FUNCTION_1;
@ -72,20 +72,20 @@ int32_t tScalableBfPutNoCheck(SScalableBf* pSBf, const void* keyBuf, uint32_t le
int32_t lino = 0;
if (pSBf->status == SBF_INVALID) {
code = TSDB_CODE_FAILED;
TSDB_CHECK_CODE(code, lino, _error);
QUERY_CHECK_CODE(code, lino, _error);
}
int32_t size = taosArrayGetSize(pSBf->bfArray);
SBloomFilter* pNormalBf = taosArrayGetP(pSBf->bfArray, size - 1);
if (!pNormalBf) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _error);
QUERY_CHECK_CODE(code, lino, _error);
}
if (tBloomFilterIsFull(pNormalBf)) {
code = tScalableBfAddFilter(pSBf, pNormalBf->expectedEntries * pSBf->growth,
pNormalBf->errorRate * DEFAULT_TIGHTENING_RATIO, &pNormalBf);
if (code != TSDB_CODE_SUCCESS) {
pSBf->status = SBF_INVALID;
TSDB_CHECK_CODE(code, lino, _error);
QUERY_CHECK_CODE(code, lino, _error);
}
}
return tBloomFilterPut(pNormalBf, keyBuf, len);
@ -102,7 +102,7 @@ int32_t tScalableBfPut(SScalableBf* pSBf, const void* keyBuf, uint32_t len, int3
int32_t lino = 0;
if (pSBf->status == SBF_INVALID) {
code = TSDB_CODE_FAILED;
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
}
uint64_t h1 = (uint64_t)pSBf->hashFn1(keyBuf, len);
uint64_t h2 = (uint64_t)pSBf->hashFn2(keyBuf, len);
@ -121,7 +121,7 @@ int32_t tScalableBfPut(SScalableBf* pSBf, const void* keyBuf, uint32_t len, int3
pNormalBf->errorRate * DEFAULT_TIGHTENING_RATIO, &pNormalBf);
if (code != TSDB_CODE_SUCCESS) {
pSBf->status = SBF_INVALID;
TSDB_CHECK_CODE(code, lino, _end);
QUERY_CHECK_CODE(code, lino, _end);
}
}
(*winRes) = tBloomFilterPutHash(pNormalBf, h1, h2);
@ -154,17 +154,17 @@ static int32_t tScalableBfAddFilter(SScalableBf* pSBf, uint64_t expectedEntries,
int32_t lino = 0;
if (taosArrayGetSize(pSBf->bfArray) >= pSBf->maxBloomFilters) {
code = TSDB_CODE_FAILED;
TSDB_CHECK_CODE(code, lino, _error);
QUERY_CHECK_CODE(code, lino, _error);
}
SBloomFilter* pNormalBf = NULL;
code = tBloomFilterInit(expectedEntries, errorRate, &pNormalBf);
TSDB_CHECK_CODE(code, lino, _error);
QUERY_CHECK_CODE(code, lino, _error);
if (taosArrayPush(pSBf->bfArray, &pNormalBf) == NULL) {
tBloomFilterDestroy(pNormalBf);
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _error);
QUERY_CHECK_CODE(code, lino, _error);
}
pSBf->numBits += pNormalBf->numBits;
(*ppNormalBf) = pNormalBf;
@ -210,7 +210,7 @@ int32_t tScalableBfDecode(SDecoder* pDecoder, SScalableBf** ppSBf) {
SScalableBf* pSBf = taosMemoryCalloc(1, sizeof(SScalableBf));
if (!pSBf) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _error);
QUERY_CHECK_CODE(code, lino, _error);
}
pSBf->hashFn1 = HASH_FUNCTION_1;
pSBf->hashFn2 = HASH_FUNCTION_2;
@ -218,43 +218,43 @@ int32_t tScalableBfDecode(SDecoder* pDecoder, SScalableBf** ppSBf) {
int32_t size = 0;
if (tDecodeI32(pDecoder, &size) < 0) {
code = TSDB_CODE_FAILED;
TSDB_CHECK_CODE(code, lino, _error);
QUERY_CHECK_CODE(code, lino, _error);
}
if (size == 0) {
code = TSDB_CODE_FAILED;
TSDB_CHECK_CODE(code, lino, _error);
QUERY_CHECK_CODE(code, lino, _error);
}
pSBf->bfArray = taosArrayInit(size * 2, POINTER_BYTES);
if (!pSBf->bfArray) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _error);
QUERY_CHECK_CODE(code, lino, _error);
}
for (int32_t i = 0; i < size; i++) {
SBloomFilter* pBF = NULL;
code = tBloomFilterDecode(pDecoder, &pBF);
TSDB_CHECK_CODE(code, lino, _error);
QUERY_CHECK_CODE(code, lino, _error);
void* tmpRes = taosArrayPush(pSBf->bfArray, &pBF);
if (!tmpRes) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _error);
QUERY_CHECK_CODE(code, lino, _error);
}
}
if (tDecodeU32(pDecoder, &pSBf->growth) < 0) {
code = TSDB_CODE_FAILED;
TSDB_CHECK_CODE(code, lino, _error);
QUERY_CHECK_CODE(code, lino, _error);
}
if (tDecodeU64(pDecoder, &pSBf->numBits) < 0) {
code = TSDB_CODE_FAILED;
TSDB_CHECK_CODE(code, lino, _error);
QUERY_CHECK_CODE(code, lino, _error);
}
if (tDecodeU32(pDecoder, &pSBf->maxBloomFilters) < 0) {
code = TSDB_CODE_FAILED;
TSDB_CHECK_CODE(code, lino, _error);
QUERY_CHECK_CODE(code, lino, _error);
}
if (tDecodeI8(pDecoder, &pSBf->status) < 0) {
code = TSDB_CODE_FAILED;
TSDB_CHECK_CODE(code, lino, _error);
QUERY_CHECK_CODE(code, lino, _error);
}
(*ppSBf) = pSBf;