adj stream operator result

This commit is contained in:
54liuyao 2024-07-18 16:08:01 +08:00
parent 952908f0c9
commit cc9233fdb3
9 changed files with 277 additions and 193 deletions

View File

@ -939,7 +939,7 @@ void initGroupResInfoFromArrayList(SGroupResInfo* pGroupResInfo, SArray* pAr
void getSessionHashKey(const SSessionKey* pKey, SSessionKey* pHashKey);
int32_t deleteSessionWinState(SStreamAggSupporter* pAggSup, SSDataBlock* pBlock, SSHashObj* pMapUpdate,
SSHashObj* pMapDelete, SSHashObj* pPkDelete, bool needAdd);
void getAllSessionWindow(SSHashObj* pHashMap, SSHashObj* pStUpdated);
int32_t getAllSessionWindow(SSHashObj* pHashMap, SSHashObj* pStUpdated);
int32_t closeSessionWindow(SSHashObj* pHashMap, STimeWindowAggSupp* pTwSup, SSHashObj* pClosed);
int32_t copyUpdateResult(SSHashObj** ppWinUpdated, SArray* pUpdated, __compar_fn_t compar);
int32_t sessionKeyCompareAsc(const void* pKey1, const void* pKey2);
@ -991,7 +991,7 @@ void doDeleteSessionWindow(SStreamAggSupporter* pAggSup, SSessionKey* pKey);
int32_t saveDeleteInfo(SArray* pWins, SSessionKey key);
void removeSessionResults(SStreamAggSupporter* pAggSup, SSHashObj* pHashMap, SArray* pWins);
int32_t copyDeleteWindowInfo(SArray* pResWins, SSHashObj* pStDeleted);
void copyDeleteSessionKey(SSHashObj* source, SSHashObj* dest);
int32_t copyDeleteSessionKey(SSHashObj* source, SSHashObj* dest);
bool inSlidingWindow(SInterval* pInterval, STimeWindow* pWin, SDataBlockInfo* pBlockInfo);
bool inCalSlidingWindow(SInterval* pInterval, STimeWindow* pWin, TSKEY calStart, TSKEY calEnd, EStreamType blockType);

View File

@ -140,21 +140,21 @@ static void removeCountResult(SSHashObj* pHashMap, SSHashObj* pResMap, SSessionK
getSessionHashKey(pKey, &key);
int32_t code = tSimpleHashRemove(pHashMap, &key, sizeof(SSessionKey));
if (code != TSDB_CODE_SUCCESS) {
qWarn("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
qInfo("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
}
code = tSimpleHashRemove(pResMap, &key, sizeof(SSessionKey));
if (code != TSDB_CODE_SUCCESS) {
qWarn("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
qInfo("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
}
}
static int32_t updateCountWindowInfo(SStreamAggSupporter* pAggSup, SCountWindowInfo* pWinInfo, TSKEY* pTs,
int32_t start, int32_t rows, int32_t maxRows, SSHashObj* pStUpdated,
SSHashObj* pStDeleted, bool* pRebuild) {
SSessionKey sWinKey = pWinInfo->winInfo.sessionWin;
SSHashObj* pStDeleted, bool* pRebuild, int32_t* pWinRows) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SSessionKey sWinKey = pWinInfo->winInfo.sessionWin;
int32_t num = 0;
for (int32_t i = start; i < rows; i++) {
if (pTs[i] < pWinInfo->winInfo.sessionWin.win.ekey) {
@ -173,9 +173,7 @@ static int32_t updateCountWindowInfo(SStreamAggSupporter* pAggSup, SCountWindowI
needDelState = true;
if (pStDeleted && pWinInfo->winInfo.isOutput) {
code = saveDeleteRes(pStDeleted, pWinInfo->winInfo.sessionWin);
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
}
TSDB_CHECK_CODE(code, lino, _end);
}
pWinInfo->winInfo.sessionWin.win.skey = pTs[start];
@ -194,7 +192,13 @@ static int32_t updateCountWindowInfo(SStreamAggSupporter* pAggSup, SCountWindowI
}
}
return maxNum;
(*pWinRows) = maxNum;
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
return code;
}
void getCountWinRange(SStreamAggSupporter* pAggSup, const SSessionKey* pKey, EStreamType mode, SSessionKey* pDelRange) {
@ -292,8 +296,9 @@ static void doStreamCountAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl
setSessionWinOutputInfo(pStUpdated, &curWin.winInfo);
slidingRows = *curWin.pWindowCount;
if (!buffInfo.rebuildWindow) {
winRows = updateCountWindowInfo(pAggSup, &curWin, startTsCols, i, rows, pAggSup->windowCount, pStUpdated,
pStDeleted, &buffInfo.rebuildWindow);
code = updateCountWindowInfo(pAggSup, &curWin, startTsCols, i, rows, pAggSup->windowCount, pStUpdated,
pStDeleted, &buffInfo.rebuildWindow, &winRows);
TSDB_CHECK_CODE(code, lino, _end);
}
if (buffInfo.rebuildWindow) {
SSessionKey range = {0};
@ -655,7 +660,8 @@ static SSDataBlock* doStreamCountAgg(SOperatorInfo* pOperator) {
continue;
} else if (pBlock->info.type == STREAM_GET_ALL) {
pInfo->recvGetAll = true;
getAllSessionWindow(pAggSup->pResultRows, pInfo->pStUpdated);
code = getAllSessionWindow(pAggSup->pResultRows, pInfo->pStUpdated);
TSDB_CHECK_CODE(code, lino, _end);
continue;
} else if (pBlock->info.type == STREAM_CREATE_CHILD_TABLE) {
return pBlock;
@ -696,7 +702,8 @@ static SSDataBlock* doStreamCountAgg(SOperatorInfo* pOperator) {
TSDB_CHECK_CODE(code, lino, _end);
if (pInfo->destHasPrimaryKey && IS_NORMAL_COUNT_OP(pOperator)) {
copyDeleteSessionKey(pInfo->pPkDeleted, pInfo->pStDeleted);
code = copyDeleteSessionKey(pInfo->pPkDeleted, pInfo->pStDeleted);
TSDB_CHECK_CODE(code, lino, _end);
}
SSDataBlock* opRes = buildCountResult(pOperator);

View File

@ -185,12 +185,14 @@ _error:
int32_t updateEventWindowInfo(SStreamAggSupporter* pAggSup, SEventWindowInfo* pWinInfo, SSessionKey* pNextWinKey,
TSKEY* pTsData, bool* starts, bool* ends, int32_t rows, int32_t start,
SSHashObj* pResultRows, SSHashObj* pStUpdated, SSHashObj* pStDeleted, bool* pRebuild) {
SSHashObj* pResultRows, SSHashObj* pStUpdated, SSHashObj* pStDeleted, bool* pRebuild,
int32_t* pWinRow) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
*pRebuild = false;
if (!pWinInfo->pWinFlag->startFlag && !(starts[start])) {
return 1;
(*pWinRow) = 1;
goto _end;
}
TSKEY maxTs = INT64_MAX;
@ -204,15 +206,14 @@ int32_t updateEventWindowInfo(SStreamAggSupporter* pAggSup, SEventWindowInfo* pW
for (int32_t i = start; i < rows; ++i) {
if (pTsData[i] >= maxTs) {
return i - start;
(*pWinRow) = i - start;
goto _end;
}
if (pWin->skey > pTsData[i]) {
if (pStDeleted && pWinInfo->winInfo.isOutput) {
code = saveDeleteRes(pStDeleted, pWinInfo->winInfo.sessionWin);
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
}
TSDB_CHECK_CODE(code, lino, _end);
}
removeSessionResult(pAggSup, pStUpdated, pResultRows, &pWinInfo->winInfo.sessionWin);
pWin->skey = pTsData[i];
@ -229,7 +230,8 @@ int32_t updateEventWindowInfo(SStreamAggSupporter* pAggSup, SEventWindowInfo* pW
} else {
*pRebuild = true;
pWinInfo->pWinFlag->endFlag |= ends[i];
return i + 1 - start;
(*pWinRow) = i + 1 - start;
goto _end;
}
memcpy(pWinInfo->winInfo.pStatePos->pKey, &pWinInfo->winInfo.sessionWin, sizeof(SSessionKey));
@ -238,10 +240,17 @@ int32_t updateEventWindowInfo(SStreamAggSupporter* pAggSup, SEventWindowInfo* pW
if (pWinInfo->pWinFlag->endFlag && pWin->skey <= pTsData[i] && pTsData[i] < pWin->ekey) {
*pRebuild = true;
}
return i + 1 - start;
(*pWinRow) = i + 1 - start;
goto _end;
}
}
return rows - start;
(*pWinRow) = rows - start;
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
return code;
}
static int32_t compactEventWindow(SOperatorInfo* pOperator, SEventWindowInfo* pCurWin, SSHashObj* pStUpdated,
@ -362,9 +371,10 @@ static void doStreamEventAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl
setSessionWinOutputInfo(pSeUpdated, &curWin.winInfo);
bool rebuild = false;
winRows =
updateEventWindowInfo(pAggSup, &curWin, &nextWinKey, tsCols, (bool*)pColStart->pData, (bool*)pColEnd->pData,
rows, i, pAggSup->pResultRows, pSeUpdated, pStDeleted, &rebuild);
code = updateEventWindowInfo(pAggSup, &curWin, &nextWinKey, tsCols, (bool*)pColStart->pData, (bool*)pColEnd->pData,
rows, i, pAggSup->pResultRows, pSeUpdated, pStDeleted, &rebuild, &winRows);
TSDB_CHECK_CODE(code, lino, _end);
ASSERT(winRows >= 1);
if (rebuild) {
uint64_t uid = 0;
@ -616,7 +626,8 @@ static SSDataBlock* doStreamEventAgg(SOperatorInfo* pOperator) {
continue;
} else if (pBlock->info.type == STREAM_GET_ALL) {
pInfo->recvGetAll = true;
getAllSessionWindow(pInfo->streamAggSup.pResultRows, pInfo->pSeUpdated);
code = getAllSessionWindow(pInfo->streamAggSup.pResultRows, pInfo->pSeUpdated);
TSDB_CHECK_CODE(code, lino, _end);
continue;
} else if (pBlock->info.type == STREAM_CREATE_CHILD_TABLE) {
return pBlock;
@ -668,7 +679,8 @@ static SSDataBlock* doStreamEventAgg(SOperatorInfo* pOperator) {
taosArrayDestroy(pHisWins);
}
if (pInfo->destHasPrimaryKey && IS_NORMAL_EVENT_OP(pOperator)) {
copyDeleteSessionKey(pInfo->pPkDeleted, pInfo->pSeDeleted);
code = copyDeleteSessionKey(pInfo->pPkDeleted, pInfo->pSeDeleted);
TSDB_CHECK_CODE(code, lino, _end);
}
initGroupResInfoFromArrayList(&pInfo->groupResInfo, pInfo->pUpdated);

View File

@ -150,22 +150,6 @@ void resetPrevAndNextWindow(SStreamFillSupporter* pFillSup, void* pState, SStora
resetFillWindow(&pFillSup->nextNext);
}
void getCurWindowFromDiscBuf(SOperatorInfo* pOperator, TSKEY ts, uint64_t groupId, SStreamFillSupporter* pFillSup) {
SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI;
void* pState = pOperator->pTaskInfo->streamInfo.pState;
resetPrevAndNextWindow(pFillSup, pState, pAPI);
SWinKey key = {.ts = ts, .groupId = groupId};
int32_t curVLen = 0;
int32_t code = pAPI->stateStore.streamStateFillGet(pState, &key, (void**)&pFillSup->cur.pRowVal, &curVLen);
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
}
pFillSup->cur.key = key.ts;
}
void getWindowFromDiscBuf(SOperatorInfo* pOperator, TSKEY ts, uint64_t groupId, SStreamFillSupporter* pFillSup) {
SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI;
void* pState = pOperator->pTaskInfo->streamInfo.pState;
@ -477,25 +461,40 @@ void setFillValueInfo(SSDataBlock* pBlock, TSKEY ts, int32_t rowId, SStreamFillS
ASSERT(pFillInfo->pos != FILL_POS_INVALID);
}
static bool checkResult(SStreamFillSupporter* pFillSup, TSKEY ts, uint64_t groupId) {
static int32_t checkResult(SStreamFillSupporter* pFillSup, TSKEY ts, uint64_t groupId, bool* pRes) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SWinKey key = {.groupId = groupId, .ts = ts};
if (tSimpleHashGet(pFillSup->pResMap, &key, sizeof(SWinKey)) != NULL) {
return false;
(*pRes) = false;
}
int32_t code = tSimpleHashPut(pFillSup->pResMap, &key, sizeof(SWinKey), NULL, 0);
code = tSimpleHashPut(pFillSup->pResMap, &key, sizeof(SWinKey), NULL, 0);
TSDB_CHECK_CODE(code, lino, _end);
(*pRes) = true;
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
return true;
return code;
}
static bool buildFillResult(SResultRowData* pResRow, SStreamFillSupporter* pFillSup, TSKEY ts, SSDataBlock* pBlock) {
static int32_t buildFillResult(SResultRowData* pResRow, SStreamFillSupporter* pFillSup, TSKEY ts, SSDataBlock* pBlock,
bool* pRes) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
if (pBlock->info.rows >= pBlock->info.capacity) {
return false;
(*pRes) = false;
goto _end;
}
uint64_t groupId = pBlock->info.id.groupId;
if (pFillSup->hasDelete && !checkResult(pFillSup, ts, groupId)) {
return true;
bool ckRes = true;
code = checkResult(pFillSup, ts, groupId, &ckRes);
TSDB_CHECK_CODE(code, lino, _end);
if (pFillSup->hasDelete && !ckRes) {
(*pRes) = true;
goto _end;
}
for (int32_t i = 0; i < pFillSup->numOfAllCols; ++i) {
SFillColInfo* pFillCol = pFillSup->pAllColInfo + i;
@ -509,14 +508,18 @@ static bool buildFillResult(SResultRowData* pResRow, SStreamFillSupporter* pFill
bool filled = fillIfWindowPseudoColumn(&tmpInfo, pFillCol, pColData, pBlock->info.rows);
if (!filled) {
SResultCellData* pCell = getResultCell(pResRow, slotId);
int32_t code = setRowCell(pColData, pBlock->info.rows, pCell);
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
}
code = setRowCell(pColData, pBlock->info.rows, pCell);
TSDB_CHECK_CODE(code, lino, _end);
}
}
pBlock->info.rows++;
return true;
(*pRes) = true;
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
return code;
}
static bool hasRemainCalc(SStreamFillInfo* pFillInfo) {
@ -527,26 +530,37 @@ static bool hasRemainCalc(SStreamFillInfo* pFillInfo) {
}
static void doStreamFillNormal(SStreamFillSupporter* pFillSup, SStreamFillInfo* pFillInfo, SSDataBlock* pBlock) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
while (hasRemainCalc(pFillInfo) && pBlock->info.rows < pBlock->info.capacity) {
STimeWindow st = {.skey = pFillInfo->current, .ekey = pFillInfo->current};
if (inWinRange(&pFillSup->winRange, &st)) {
bool res = buildFillResult(pFillInfo->pResRow, pFillSup, pFillInfo->current, pBlock);
if (!res) {
int32_t code = TSDB_CODE_FAILED;
qError("%s failed at line %d since block is full", __func__, __LINE__);
}
bool res = true;
code = buildFillResult(pFillInfo->pResRow, pFillSup, pFillInfo->current, pBlock, &res);
TSDB_CHECK_CODE(code, lino, _end);
}
pFillInfo->current = taosTimeAdd(pFillInfo->current, pFillSup->interval.sliding, pFillSup->interval.slidingUnit,
pFillSup->interval.precision);
}
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
}
static void doStreamFillLinear(SStreamFillSupporter* pFillSup, SStreamFillInfo* pFillInfo, SSDataBlock* pBlock) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
while (hasRemainCalc(pFillInfo) && pBlock->info.rows < pBlock->info.capacity) {
uint64_t groupId = pBlock->info.id.groupId;
SWinKey key = {.groupId = groupId, .ts = pFillInfo->current};
STimeWindow st = {.skey = pFillInfo->current, .ekey = pFillInfo->current};
if ((pFillSup->hasDelete && !checkResult(pFillSup, pFillInfo->current, groupId)) ||
bool ckRes = true;
code = checkResult(pFillSup, pFillInfo->current, groupId, &ckRes);
TSDB_CHECK_CODE(code, lino, _end);
if ((pFillSup->hasDelete && !ckRes) ||
!inWinRange(&pFillSup->winRange, &st)) {
pFillInfo->current = taosTimeAdd(pFillInfo->current, pFillSup->interval.sliding, pFillSup->interval.slidingUnit,
pFillSup->interval.precision);
@ -570,10 +584,8 @@ static void doStreamFillLinear(SStreamFillSupporter* pFillSup, SStreamFillInfo*
if (pFillCol->notFillCol) {
bool filled = fillIfWindowPseudoColumn(&tmp, pFillCol, pColData, index);
if (!filled) {
int32_t code = setRowCell(pColData, index, pCell);
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
}
code = setRowCell(pColData, index, pCell);
TSDB_CHECK_CODE(code, lino, _end);
}
} else {
if (IS_VAR_DATA_TYPE(type) || type == TSDB_DATA_TYPE_BOOL || pCell->isNull) {
@ -590,10 +602,8 @@ static void doStreamFillLinear(SStreamFillSupporter* pFillSup, SStreamFillInfo*
cur.key = pFillInfo->current;
cur.val = taosMemoryCalloc(1, pCell->bytes);
taosGetLinearInterpolationVal(&cur, pCell->type, &start, pEnd, pCell->type);
int32_t code = colDataSetVal(pColData, index, (const char*)cur.val, false);
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
}
code = colDataSetVal(pColData, index, (const char*)cur.val, false);
TSDB_CHECK_CODE(code, lino, _end);
destroySPoint(&cur);
}
}
@ -601,6 +611,11 @@ static void doStreamFillLinear(SStreamFillSupporter* pFillSup, SStreamFillInfo*
pFillSup->interval.precision);
pBlock->info.rows++;
}
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
}
static void keepResultInDiscBuf(SOperatorInfo* pOperator, uint64_t groupId, SResultRowData* pRow, int32_t len) {
@ -613,17 +628,19 @@ static void keepResultInDiscBuf(SOperatorInfo* pOperator, uint64_t groupId, SRes
}
static void doStreamFillRange(SStreamFillInfo* pFillInfo, SStreamFillSupporter* pFillSup, SSDataBlock* pRes) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
bool res = false;
if (pFillInfo->needFill == false) {
bool res = buildFillResult(&pFillSup->cur, pFillSup, pFillSup->cur.key, pRes);
if (!res) {
int32_t code = TSDB_CODE_FAILED;
qError("%s failed at line %d since block is full", __func__, __LINE__);
}
code = buildFillResult(&pFillSup->cur, pFillSup, pFillSup->cur.key, pRes, &res);
TSDB_CHECK_CODE(code, lino, _end);
return;
}
if (pFillInfo->pos == FILL_POS_START) {
if (buildFillResult(&pFillSup->cur, pFillSup, pFillSup->cur.key, pRes)) {
code = buildFillResult(&pFillSup->cur, pFillSup, pFillSup->cur.key, pRes, &res);
TSDB_CHECK_CODE(code, lino, _end);
if (res) {
pFillInfo->pos = FILL_POS_INVALID;
}
}
@ -633,7 +650,9 @@ static void doStreamFillRange(SStreamFillInfo* pFillInfo, SStreamFillSupporter*
doStreamFillLinear(pFillSup, pFillInfo, pRes);
if (pFillInfo->pos == FILL_POS_MID) {
if (buildFillResult(&pFillSup->cur, pFillSup, pFillSup->cur.key, pRes)) {
code = buildFillResult(&pFillSup->cur, pFillSup, pFillSup->cur.key, pRes, &res);
TSDB_CHECK_CODE(code, lino, _end);
if (res) {
pFillInfo->pos = FILL_POS_INVALID;
}
}
@ -648,10 +667,17 @@ static void doStreamFillRange(SStreamFillInfo* pFillInfo, SStreamFillSupporter*
}
}
if (pFillInfo->pos == FILL_POS_END) {
if (buildFillResult(&pFillSup->cur, pFillSup, pFillSup->cur.key, pRes)) {
code = buildFillResult(&pFillSup->cur, pFillSup, pFillSup->cur.key, pRes, &res);
TSDB_CHECK_CODE(code, lino, _end);
if (res) {
pFillInfo->pos = FILL_POS_INVALID;
}
}
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
}
void keepBlockRowInDiscBuf(SOperatorInfo* pOperator, SStreamFillInfo* pFillInfo, SSDataBlock* pBlock, TSKEY* tsCol,

View File

@ -514,23 +514,22 @@ void reloadFromDownStream(SOperatorInfo* downstream, SStreamIntervalOperatorInfo
pInfo->pUpdateInfo = pScanInfo->pUpdateInfo;
}
void initIntervalDownStream(SOperatorInfo* downstream, uint16_t type, SStreamIntervalOperatorInfo* pInfo) {
int32_t initIntervalDownStream(SOperatorInfo* downstream, uint16_t type, SStreamIntervalOperatorInfo* pInfo) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SStateStore* pAPI = &downstream->pTaskInfo->storageAPI.stateStore;
if (downstream->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
initIntervalDownStream(downstream->pDownstream[0], type, pInfo);
return;
return initIntervalDownStream(downstream->pDownstream[0], type, pInfo);
}
SStreamScanInfo* pScanInfo = downstream->info;
pScanInfo->windowSup.parentType = type;
pScanInfo->windowSup.pIntervalAggSup = &pInfo->aggSup;
if (!pScanInfo->pUpdateInfo) {
int32_t code = pAPI->updateInfoInitP(&pInfo->interval, pInfo->twAggSup.waterMark, pScanInfo->igCheckUpdate,
pScanInfo->pkColType, pScanInfo->pkColLen, &pScanInfo->pUpdateInfo);
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at %d since %s", __func__, __LINE__, tstrerror(code));
}
code = pAPI->updateInfoInitP(&pInfo->interval, pInfo->twAggSup.waterMark, pScanInfo->igCheckUpdate,
pScanInfo->pkColType, pScanInfo->pkColLen, &pScanInfo->pUpdateInfo);
TSDB_CHECK_CODE(code, lino, _end);
}
pScanInfo->interval = pInfo->interval;
@ -538,6 +537,12 @@ void initIntervalDownStream(SOperatorInfo* downstream, uint16_t type, SStreamInt
pScanInfo->pState = pInfo->pState;
pInfo->pUpdateInfo = pScanInfo->pUpdateInfo;
pInfo->basic.primaryPkIndex = pScanInfo->primaryKeyIndex;
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
return code;
}
int32_t compactFunctions(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx, int32_t numOfOutput,
@ -1795,12 +1800,14 @@ _end:
SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,
SExecTaskInfo* pTaskInfo, int32_t numOfChild,
SReadHandle* pHandle) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SIntervalPhysiNode* pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode;
SStreamIntervalOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamIntervalOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
int32_t code = 0;
if (pInfo == NULL || pOperator == NULL) {
goto _error;
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _error);
}
pOperator->exprSupp.hasWindowOrGroup = true;
@ -1830,9 +1837,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
int32_t numOfScalar = 0;
SExprInfo* pScalarExprInfo = createExprInfo(pIntervalPhyNode->window.pExprs, NULL, &numOfScalar);
code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar, &pTaskInfo->storageAPI.functionStore);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
TSDB_CHECK_CODE(code, lino, _error);
}
int32_t numOfCols = 0;
@ -1850,9 +1855,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
pAPI->stateStore.streamStateSetNumber(pInfo->pState, -1, pInfo->primaryTsIndex);
code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str,
pInfo->pState, &pTaskInfo->storageAPI.functionStore);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
TSDB_CHECK_CODE(code, lino, _error);
initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
initResultRowInfo(&pInfo->binfo.resultRowInfo);
@ -1912,12 +1915,11 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
setOperatorStreamStateFn(pOperator, streamIntervalReleaseState, streamIntervalReloadState);
if (pPhyNode->type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL ||
pPhyNode->type == QUERY_NODE_PHYSICAL_PLAN_STREAM_MID_INTERVAL) {
initIntervalDownStream(downstream, pPhyNode->type, pInfo);
code = initIntervalDownStream(downstream, pPhyNode->type, pInfo);
TSDB_CHECK_CODE(code, lino, _error);
}
code = appendDownstream(pOperator, &downstream, 1);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
TSDB_CHECK_CODE(code, lino, _error);
// for stream
void* buff = NULL;
@ -2162,13 +2164,19 @@ void getSessionWinBuf(SStreamAggSupporter* pAggSup, SStreamStateCur* pCur, SResu
}
int32_t saveDeleteInfo(SArray* pWins, SSessionKey key) {
// key.win.ekey = key.win.skey;
void* res = taosArrayPush(pWins, &key);
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
void* res = taosArrayPush(pWins, &key);
if (!res) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(TSDB_CODE_OUT_OF_MEMORY));
return TSDB_CODE_OUT_OF_MEMORY;
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _end);
}
return TSDB_CODE_SUCCESS;
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
return code;
}
int32_t saveDeleteRes(SSHashObj* pStDelete, SSessionKey key) {
@ -2240,17 +2248,19 @@ void removeSessionResults(SStreamAggSupporter* pAggSup, SSHashObj* pHashMap, SAr
int32_t updateSessionWindowInfo(SStreamAggSupporter* pAggSup, SResultWindowInfo* pWinInfo, TSKEY* pStartTs,
TSKEY* pEndTs, uint64_t groupId, int32_t rows, int32_t start, int64_t gap,
SSHashObj* pResultRows, SSHashObj* pStUpdated, SSHashObj* pStDeleted) {
SSHashObj* pResultRows, SSHashObj* pStUpdated, SSHashObj* pStDeleted,
int32_t* pWinRos) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
for (int32_t i = start; i < rows; ++i) {
if (!isInWindow(pWinInfo, pStartTs[i], gap) && (!pEndTs || !isInWindow(pWinInfo, pEndTs[i], gap))) {
return i - start;
(*pWinRos) = i - start;
goto _end;
}
if (pWinInfo->sessionWin.win.skey > pStartTs[i]) {
if (pStDeleted && pWinInfo->isOutput) {
int32_t code = saveDeleteRes(pStDeleted, pWinInfo->sessionWin);
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
}
code = saveDeleteRes(pStDeleted, pWinInfo->sessionWin);
TSDB_CHECK_CODE(code, lino, _end);
}
removeSessionResult(pAggSup, pStUpdated, pResultRows, &pWinInfo->sessionWin);
pWinInfo->sessionWin.win.skey = pStartTs[i];
@ -2261,7 +2271,13 @@ int32_t updateSessionWindowInfo(SStreamAggSupporter* pAggSup, SResultWindowInfo*
}
memcpy(pWinInfo->pStatePos->pKey, &pWinInfo->sessionWin, sizeof(SSessionKey));
}
return rows - start;
(*pWinRos) = rows - start;
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
return code;
}
static int32_t initSessionOutputBuf(SResultWindowInfo* pWinInfo, SResultRow** pResult, SqlFunctionCtx* pCtx,
@ -2357,10 +2373,8 @@ int32_t compactTimeWindow(SExprSupp* pSup, SStreamAggSupporter* pAggSup, STimeWi
if (pNextWin->isOutput && pStDeleted) {
qDebug("===stream=== save delete window info %" PRId64 ", %" PRIu64, pNextWin->sessionWin.win.skey,
pNextWin->sessionWin.groupId);
int32_t code = saveDeleteRes(pStDeleted, pNextWin->sessionWin);
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
}
code = saveDeleteRes(pStDeleted, pNextWin->sessionWin);
TSDB_CHECK_CODE(code, lino, _end);
}
removeSessionResult(pAggSup, pStUpdated, pAggSup->pResultRows, &pNextWin->sessionWin);
doDeleteSessionWindow(pAggSup, &pNextWin->sessionWin);
@ -2499,8 +2513,9 @@ static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSData
continue;
}
setSessionWinOutputInfo(pStUpdated, &winInfo);
winRows = updateSessionWindowInfo(pAggSup, &winInfo, startTsCols, endTsCols, groupId, rows, i, pAggSup->gap,
pAggSup->pResultRows, pStUpdated, pStDeleted);
code = updateSessionWindowInfo(pAggSup, &winInfo, startTsCols, endTsCols, groupId, rows, i, pAggSup->gap,
pAggSup->pResultRows, pStUpdated, pStDeleted, &winRows);
TSDB_CHECK_CODE(code, lino, _end);
int64_t winDelta = 0;
if (addGap) {
@ -2749,7 +2764,7 @@ static int32_t rebuildSessionWindow(SOperatorInfo* pOperator, SArray* pWinArray,
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
return code;
}
@ -2797,7 +2812,9 @@ _end:
return code;
}
void getAllSessionWindow(SSHashObj* pHashMap, SSHashObj* pStUpdated) {
int32_t getAllSessionWindow(SSHashObj* pHashMap, SSHashObj* pStUpdated) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
void* pIte = NULL;
int32_t iter = 0;
while ((pIte = tSimpleHashIterate(pHashMap, pIte, &iter)) != NULL) {
@ -2806,12 +2823,15 @@ void getAllSessionWindow(SSHashObj* pHashMap, SSHashObj* pStUpdated) {
continue;
}
pWinInfo->pStatePos->beUpdated = false;
int32_t code = saveResult(*pWinInfo, pStUpdated);
if (code != TSDB_CODE_SUCCESS) {
pWinInfo->pStatePos->beUpdated = true;
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
}
code = saveResult(*pWinInfo, pStUpdated);
TSDB_CHECK_CODE(code, lino, _end);
}
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
return code;
}
int32_t copyDeleteWindowInfo(SArray* pResWins, SSHashObj* pStDeleted) {
@ -3186,21 +3206,27 @@ void resetUnCloseSessionWinInfo(SSHashObj* winMap) {
}
}
void copyDeleteSessionKey(SSHashObj* source, SSHashObj* dest) {
int32_t copyDeleteSessionKey(SSHashObj* source, SSHashObj* dest) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
if (tSimpleHashGetSize(source) == 0) {
return;
goto _end;
}
void* pIte = NULL;
int32_t iter = 0;
size_t keyLen = 0;
while ((pIte = tSimpleHashIterate(source, pIte, &iter)) != NULL) {
SSessionKey* pKey = tSimpleHashGetKey(pIte, &keyLen);
int32_t code = saveDeleteRes(dest, *pKey);
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
}
code = saveDeleteRes(dest, *pKey);
TSDB_CHECK_CODE(code, lino, _end);
}
tSimpleHashClear(source);
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
return code;
}
static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
@ -3281,7 +3307,8 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
continue;
} else if (pBlock->info.type == STREAM_GET_ALL) {
pInfo->recvGetAll = true;
getAllSessionWindow(pAggSup->pResultRows, pInfo->pStUpdated);
code = getAllSessionWindow(pAggSup->pResultRows, pInfo->pStUpdated);
TSDB_CHECK_CODE(code, lino, _end);
continue;
} else if (pBlock->info.type == STREAM_CREATE_CHILD_TABLE) {
return pBlock;
@ -3350,7 +3377,8 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
TSDB_CHECK_CODE(code, lino, _end);
}
if (pInfo->destHasPrimaryKey && IS_NORMAL_SESSION_OP(pOperator)) {
copyDeleteSessionKey(pInfo->pPkDeleted, pInfo->pStDeleted);
code = copyDeleteSessionKey(pInfo->pPkDeleted, pInfo->pStDeleted);
TSDB_CHECK_CODE(code, lino, _end);
}
initGroupResInfoFromArrayList(&pInfo->groupResInfo, pInfo->pUpdated);
pInfo->pUpdated = NULL;
@ -3518,22 +3546,16 @@ void streamSessionReloadState(SOperatorInfo* pOperator) {
winInfo.sessionWin.groupId);
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) {
code = saveResult(winInfo, pInfo->pStUpdated);
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
}
TSDB_CHECK_CODE(code, lino, _end);
} else if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) {
if (!isCloseWindow(&winInfo.sessionWin.win, &pInfo->twAggSup)) {
code = saveDeleteRes(pInfo->pStDeleted, winInfo.sessionWin);
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
}
TSDB_CHECK_CODE(code, lino, _end);
}
SSessionKey key = {0};
getSessionHashKey(&winInfo.sessionWin, &key);
code = tSimpleHashPut(pAggSup->pResultRows, &key, sizeof(SSessionKey), &winInfo, sizeof(SResultWindowInfo));
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
}
TSDB_CHECK_CODE(code, lino, _end);
}
}
code = saveSessionOutputBuf(pAggSup, &winInfo);
@ -3775,7 +3797,8 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) {
pInfo->clearState = true;
break;
} else if (pBlock->info.type == STREAM_GET_ALL) {
getAllSessionWindow(pInfo->streamAggSup.pResultRows, pInfo->pStUpdated);
code = getAllSessionWindow(pInfo->streamAggSup.pResultRows, pInfo->pStUpdated);
TSDB_CHECK_CODE(code, lino, _end);
continue;
} else if (pBlock->info.type == STREAM_CREATE_CHILD_TABLE) {
return pBlock;
@ -4098,7 +4121,10 @@ _end:
int32_t updateStateWindowInfo(SStreamAggSupporter* pAggSup, SStateWindowInfo* pWinInfo, SStateWindowInfo* pNextWin,
TSKEY* pTs, uint64_t groupId, SColumnInfoData* pKeyCol, int32_t rows, int32_t start,
bool* allEqual, SSHashObj* pResultRows, SSHashObj* pSeUpdated, SSHashObj* pSeDeleted) {
bool* allEqual, SSHashObj* pResultRows, SSHashObj* pSeUpdated, SSHashObj* pSeDeleted,
int32_t* pWinRows) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
*allEqual = true;
for (int32_t i = start; i < rows; ++i) {
char* pKeyData = colDataGetData(pKeyCol, i);
@ -4107,20 +4133,20 @@ int32_t updateStateWindowInfo(SStreamAggSupporter* pAggSup, SStateWindowInfo* pW
if (IS_VALID_SESSION_WIN(pNextWin->winInfo)) {
// ts belongs to the next window
if (pTs[i] >= pNextWin->winInfo.sessionWin.win.skey) {
return i - start;
(*pWinRows) = i - start;
goto _end;
}
}
} else {
return i - start;
(*pWinRows) = i - start;
goto _end;
}
}
if (pWinInfo->winInfo.sessionWin.win.skey > pTs[i]) {
if (pSeDeleted && pWinInfo->winInfo.isOutput) {
int32_t code = saveDeleteRes(pSeDeleted, pWinInfo->winInfo.sessionWin);
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
}
code = saveDeleteRes(pSeDeleted, pWinInfo->winInfo.sessionWin);
TSDB_CHECK_CODE(code, lino, _end);
}
removeSessionResult(pAggSup, pSeUpdated, pResultRows, &pWinInfo->winInfo.sessionWin);
pWinInfo->winInfo.sessionWin.win.skey = pTs[i];
@ -4131,7 +4157,13 @@ int32_t updateStateWindowInfo(SStreamAggSupporter* pAggSup, SStateWindowInfo* pW
*allEqual = false;
}
}
return rows - start;
(*pWinRows) = rows - start;
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
return code;
}
static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBlock, SSHashObj* pSeUpdated,
@ -4189,8 +4221,10 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl
releaseOutputBuf(pAggSup->pState, nextWin.winInfo.pStatePos, &pAPI->stateStore);
setSessionWinOutputInfo(pSeUpdated, &curWin.winInfo);
winRows = updateStateWindowInfo(pAggSup, &curWin, &nextWin, tsCols, groupId, pKeyColInfo, rows, i, &allEqual,
pAggSup->pResultRows, pSeUpdated, pStDeleted);
code = updateStateWindowInfo(pAggSup, &curWin, &nextWin, tsCols, groupId, pKeyColInfo, rows, i, &allEqual,
pAggSup->pResultRows, pSeUpdated, pStDeleted, &winRows);
TSDB_CHECK_CODE(code, lino, _end);
if (!allEqual) {
uint64_t uid = 0;
appendDataToSpecialBlock(pAggSup->pScanBlock, &curWin.winInfo.sessionWin.win.skey,
@ -4442,7 +4476,8 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
continue;
} else if (pBlock->info.type == STREAM_GET_ALL) {
pInfo->recvGetAll = true;
getAllSessionWindow(pInfo->streamAggSup.pResultRows, pInfo->pSeUpdated);
code = getAllSessionWindow(pInfo->streamAggSup.pResultRows, pInfo->pSeUpdated);
TSDB_CHECK_CODE(code, lino, _end);
continue;
} else if (pBlock->info.type == STREAM_CREATE_CHILD_TABLE) {
return pBlock;
@ -4483,7 +4518,8 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
TSDB_CHECK_CODE(code, lino, _end);
}
if (pInfo->destHasPrimaryKey && IS_NORMAL_STATE_OP(pOperator)) {
copyDeleteSessionKey(pInfo->pPkDeleted, pInfo->pSeDeleted);
code = copyDeleteSessionKey(pInfo->pPkDeleted, pInfo->pSeDeleted);
TSDB_CHECK_CODE(code, lino, _end);
}
initGroupResInfoFromArrayList(&pInfo->groupResInfo, pInfo->pUpdated);
@ -4582,23 +4618,17 @@ void streamStateReloadState(SOperatorInfo* pOperator) {
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) {
code = saveResult(curInfo.winInfo, pInfo->pSeUpdated);
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
}
TSDB_CHECK_CODE(code, lino, _end);
} else if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) {
if (!isCloseWindow(&curInfo.winInfo.sessionWin.win, &pInfo->twAggSup)) {
code = saveDeleteRes(pInfo->pSeDeleted, curInfo.winInfo.sessionWin);
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
}
TSDB_CHECK_CODE(code, lino, _end);
}
SSessionKey key = {0};
getSessionHashKey(&curInfo.winInfo.sessionWin, &key);
code = tSimpleHashPut(pAggSup->pResultRows, &key, sizeof(SSessionKey), &curInfo.winInfo,
sizeof(SResultWindowInfo));
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
}
TSDB_CHECK_CODE(code, lino, _end);
}
} else if (IS_VALID_SESSION_WIN(nextInfo.winInfo)) {
releaseOutputBuf(pAggSup->pState, nextInfo.winInfo.pStatePos, &pAggSup->pSessionAPI->stateStore);
@ -4896,16 +4926,17 @@ _end:
SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,
SExecTaskInfo* pTaskInfo, SReadHandle* pHandle) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SStreamIntervalOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamIntervalOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
goto _error;
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _error);
}
SStreamIntervalPhysiNode* pIntervalPhyNode = (SStreamIntervalPhysiNode*)pPhyNode;
int32_t code = TSDB_CODE_SUCCESS;
int32_t numOfCols = 0;
SExprInfo* pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &numOfCols);
int32_t numOfCols = 0;
SExprInfo* pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &numOfCols);
SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc);
pInfo->interval = (SInterval){
@ -4948,17 +4979,13 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
code = initAggSup(pSup, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str, pInfo->pState,
&pTaskInfo->storageAPI.functionStore);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
TSDB_CHECK_CODE(code, lino, _error);
if (pIntervalPhyNode->window.pExprs != NULL) {
int32_t numOfScalar = 0;
SExprInfo* pScalarExprInfo = createExprInfo(pIntervalPhyNode->window.pExprs, NULL, &numOfScalar);
code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar, &pTaskInfo->storageAPI.functionStore);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
TSDB_CHECK_CODE(code, lino, _error);
}
pInfo->invertible = false;
@ -5010,11 +5037,11 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys
taosMemoryFree(buff);
}
initIntervalDownStream(downstream, pPhyNode->type, pInfo);
code = initIntervalDownStream(downstream, pPhyNode->type, pInfo);
TSDB_CHECK_CODE(code, lino, _error);
code = appendDownstream(pOperator, &downstream, 1);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
TSDB_CHECK_CODE(code, lino, _error);
return pOperator;

View File

@ -936,9 +936,7 @@ int32_t getCountWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, C
}
code = tSimpleHashPut(pSessionBuff, &pWinKey->groupId, sizeof(uint64_t), &pWinStates, POINTER_BYTES);
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
}
TSDB_CHECK_CODE(code, lino, _end);
}
TSKEY startTs = pWinKey->win.skey;

View File

@ -239,7 +239,7 @@ int32_t streamStateFillGet(SStreamState* pState, const SWinKey* key, void** pVal
// todo refactor
void streamStateFillDel(SStreamState* pState, const SWinKey* key) {
int32_t code = streamStateFillDel_rocksdb(pState, key);
qError("%s at line %d res %d", __func__, __LINE__, code);
qTrace("%s at line %d res %d", __func__, __LINE__, code);
}
void streamStateClear(SStreamState* pState) { streamFileStateClear(pState->pFileState); }

View File

@ -279,7 +279,8 @@ bool updateInfoIsTableInserted(SUpdateInfo* pInfo, int64_t tbUid) {
return false;
}
int32_t updateInfoFillBlockData(SUpdateInfo* pInfo, SSDataBlock* pBlock, int32_t primaryTsCol, int32_t primaryKeyCol, TSKEY* pMaxResTs) {
int32_t updateInfoFillBlockData(SUpdateInfo* pInfo, SSDataBlock* pBlock, int32_t primaryTsCol, int32_t primaryKeyCol,
TSKEY* pMaxResTs) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
if (pBlock == NULL || pBlock->info.rows == 0) {
@ -652,17 +653,22 @@ _error:
}
bool isIncrementalTimeStamp(SUpdateInfo* pInfo, uint64_t tableId, TSKEY ts, void* pPkVal, int32_t len) {
TSKEY* pMapMaxTs = taosHashGet(pInfo->pMap, &tableId, sizeof(uint64_t));
bool res = true;
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
TSKEY* pMapMaxTs = taosHashGet(pInfo->pMap, &tableId, sizeof(uint64_t));
bool res = true;
if (pMapMaxTs && pInfo->comparePkRowFn(pMapMaxTs, &ts, pPkVal, pInfo->comparePkCol) == 1) {
res = false;
} else {
int32_t valueLen = getValueBuff(ts, pPkVal, len, pInfo->pValueBuff);
int32_t code = taosHashPut(pInfo->pMap, &tableId, sizeof(uint64_t), pInfo->pValueBuff, valueLen);
if (code != TSDB_CODE_SUCCESS) {
res = false;
uError("%s failed at line %d since %d", __func__, __LINE__, code);
}
code = taosHashPut(pInfo->pMap, &tableId, sizeof(uint64_t), pInfo->pValueBuff, valueLen);
TSDB_CHECK_CODE(code, lino, _error);
}
return res;
_error:
if (code != TSDB_CODE_SUCCESS) {
uError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
return false;
}

View File

@ -500,16 +500,24 @@ _error:
}
SRowBuffPos* getNewRowPosForWrite(SStreamFileState* pFileState) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SRowBuffPos* newPos = getNewRowPos(pFileState);
if (!newPos) {
qError("%s failed at line %d since newPos is null", __func__, __LINE__);
return NULL;
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _error);
}
newPos->beUsed = true;
newPos->beFlushed = false;
newPos->needFree = false;
newPos->beUpdated = true;
return newPos;
_error:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
return NULL;
}
int32_t getRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, void** pVal, int32_t* pVLen,