Merge pull request #26823 from taosdata/fix/TD-30967-3
adj operator result
This commit is contained in:
commit
d730e99959
|
@ -7356,6 +7356,7 @@ int32_t tSerializeSMqPollReq(void *buf, int32_t bufLen, SMqPollReq *pReq) {
|
||||||
if (tEncodeSTqOffsetVal(&encoder, &pReq->reqOffset) < 0) return -1;
|
if (tEncodeSTqOffsetVal(&encoder, &pReq->reqOffset) < 0) return -1;
|
||||||
if (tEncodeI8(&encoder, pReq->enableReplay) < 0) return -1;
|
if (tEncodeI8(&encoder, pReq->enableReplay) < 0) return -1;
|
||||||
if (tEncodeI8(&encoder, pReq->sourceExcluded) < 0) return -1;
|
if (tEncodeI8(&encoder, pReq->sourceExcluded) < 0) return -1;
|
||||||
|
if (tEncodeI8(&encoder, pReq->enableBatchMeta) < 0) return -1;
|
||||||
|
|
||||||
tEndEncode(&encoder);
|
tEndEncode(&encoder);
|
||||||
|
|
||||||
|
@ -7367,7 +7368,6 @@ int32_t tSerializeSMqPollReq(void *buf, int32_t bufLen, SMqPollReq *pReq) {
|
||||||
pHead->vgId = htonl(pReq->head.vgId);
|
pHead->vgId = htonl(pReq->head.vgId);
|
||||||
pHead->contLen = htonl(tlen + headLen);
|
pHead->contLen = htonl(tlen + headLen);
|
||||||
}
|
}
|
||||||
if (tEncodeI8(&encoder, pReq->enableBatchMeta) < 0) return -1;
|
|
||||||
|
|
||||||
return tlen + headLen;
|
return tlen + headLen;
|
||||||
}
|
}
|
||||||
|
|
|
@ -1002,7 +1002,7 @@ int32_t doDeleteTimeWindows(SStreamAggSupporter* pAggSup, SSDataBlock* pBlock, S
|
||||||
|
|
||||||
int32_t getNextQualifiedWindow(SInterval* pInterval, STimeWindow* pNext, SDataBlockInfo* pDataBlockInfo,
|
int32_t getNextQualifiedWindow(SInterval* pInterval, STimeWindow* pNext, SDataBlockInfo* pDataBlockInfo,
|
||||||
TSKEY* primaryKeys, int32_t prevPosition, int32_t order);
|
TSKEY* primaryKeys, int32_t prevPosition, int32_t order);
|
||||||
void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const SColumnInfoData* p, int32_t status);
|
int32_t extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const SColumnInfoData* p, int32_t status);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -295,7 +295,7 @@ qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* pReaderHandle, int3
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
createRawScanOperatorInfo(pReaderHandle, pTaskInfo, &pTaskInfo->pRoot);
|
code = createRawScanOperatorInfo(pReaderHandle, pTaskInfo, &pTaskInfo->pRoot);
|
||||||
if (NULL == pTaskInfo->pRoot || code != 0) {
|
if (NULL == pTaskInfo->pRoot || code != 0) {
|
||||||
taosMemoryFree(pTaskInfo);
|
taosMemoryFree(pTaskInfo);
|
||||||
return NULL;
|
return NULL;
|
||||||
|
@ -1160,7 +1160,12 @@ SMqBatchMetaRsp* qStreamExtractMetaMsg(qTaskInfo_t tinfo) {
|
||||||
|
|
||||||
void qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset) {
|
void qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset) {
|
||||||
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
|
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
|
||||||
tOffsetCopy(pOffset, &pTaskInfo->streamInfo.currentOffset);
|
int32_t code = tOffsetCopy(pOffset, &pTaskInfo->streamInfo.currentOffset);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
|
||||||
|
pTaskInfo->code = code;
|
||||||
|
T_LONG_JMP(pTaskInfo->env, code);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t initQueryTableDataCondForTmq(SQueryTableDataCond* pCond, SSnapContext* sContext, SMetaTableInfo* pMtInfo) {
|
int32_t initQueryTableDataCondForTmq(SQueryTableDataCond* pCond, SSnapContext* sContext, SMetaTableInfo* pMtInfo) {
|
||||||
|
@ -1231,7 +1236,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
|
||||||
}
|
}
|
||||||
|
|
||||||
if (subType == TOPIC_SUB_TYPE__COLUMN) {
|
if (subType == TOPIC_SUB_TYPE__COLUMN) {
|
||||||
extractOperatorInTree(pOperator, QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN, id, &pOperator);
|
code = extractOperatorInTree(pOperator, QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN, id, &pOperator);
|
||||||
if (pOperator == NULL || code != 0) {
|
if (pOperator == NULL || code != 0) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
|
@ -75,7 +75,7 @@ static UNUSED_FUNC void* u_realloc(void* p, size_t __size) {
|
||||||
static int32_t setBlockSMAInfo(SqlFunctionCtx* pCtx, SExprInfo* pExpr, SSDataBlock* pBlock);
|
static int32_t setBlockSMAInfo(SqlFunctionCtx* pCtx, SExprInfo* pExpr, SSDataBlock* pBlock);
|
||||||
|
|
||||||
static int32_t initCtxOutputBuffer(SqlFunctionCtx* pCtx, int32_t size);
|
static int32_t initCtxOutputBuffer(SqlFunctionCtx* pCtx, int32_t size);
|
||||||
static void doApplyScalarCalculation(SOperatorInfo* pOperator, SSDataBlock* pBlock, int32_t order, int32_t scanFlag);
|
static void doApplyScalarCalculation(SOperatorInfo* pOperator, SSDataBlock* pBlock, int32_t order, int32_t scanFlag);
|
||||||
|
|
||||||
static int32_t doSetInputDataBlock(SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t order, int32_t scanFlag,
|
static int32_t doSetInputDataBlock(SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t order, int32_t scanFlag,
|
||||||
bool createDummyCol);
|
bool createDummyCol);
|
||||||
|
@ -193,7 +193,7 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR
|
||||||
// add a new result set for a new group
|
// add a new result set for a new group
|
||||||
SResultRowPosition pos = {.pageId = pResult->pageId, .offset = pResult->offset};
|
SResultRowPosition pos = {.pageId = pResult->pageId, .offset = pResult->offset};
|
||||||
int32_t code = tSimpleHashPut(pSup->pResultRowHashTable, pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes), &pos,
|
int32_t code = tSimpleHashPut(pSup->pResultRowHashTable, pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes), &pos,
|
||||||
sizeof(SResultRowPosition));
|
sizeof(SResultRowPosition));
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
|
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
|
||||||
T_LONG_JMP(pTaskInfo->env, code);
|
T_LONG_JMP(pTaskInfo->env, code);
|
||||||
|
@ -522,7 +522,7 @@ int32_t setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t n
|
||||||
if (!pResInfo->initialized) {
|
if (!pResInfo->initialized) {
|
||||||
if (pCtx[i].functionId != -1) {
|
if (pCtx[i].functionId != -1) {
|
||||||
int32_t code = pCtx[i].fpSet.init(&pCtx[i], pResInfo);
|
int32_t code = pCtx[i].fpSet.init(&pCtx[i], pResInfo);
|
||||||
if (code != TSDB_CODE_SUCCESS && fmIsUserDefinedFunc(pCtx[i].functionId)){
|
if (code != TSDB_CODE_SUCCESS && fmIsUserDefinedFunc(pCtx[i].functionId)) {
|
||||||
pResInfo->initialized = false;
|
pResInfo->initialized = false;
|
||||||
return TSDB_CODE_UDF_FUNC_EXEC_FAILURE;
|
return TSDB_CODE_UDF_FUNC_EXEC_FAILURE;
|
||||||
}
|
}
|
||||||
|
@ -567,7 +567,8 @@ int32_t doFilter(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SColMatchInfo* p
|
||||||
code = filterExecute(pFilterInfo, pBlock, &p, NULL, param1.numOfCols, &status);
|
code = filterExecute(pFilterInfo, pBlock, &p, NULL, param1.numOfCols, &status);
|
||||||
QUERY_CHECK_CODE(code, lino, _err);
|
QUERY_CHECK_CODE(code, lino, _err);
|
||||||
|
|
||||||
extractQualifiedTupleByFilterResult(pBlock, p, status);
|
code = extractQualifiedTupleByFilterResult(pBlock, p, status);
|
||||||
|
QUERY_CHECK_CODE(code, lino, _err);
|
||||||
|
|
||||||
if (pColMatchInfo != NULL) {
|
if (pColMatchInfo != NULL) {
|
||||||
size_t size = taosArrayGetSize(pColMatchInfo->pList);
|
size_t size = taosArrayGetSize(pColMatchInfo->pList);
|
||||||
|
@ -591,18 +592,20 @@ _err:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const SColumnInfoData* p, int32_t status) {
|
int32_t extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const SColumnInfoData* p, int32_t status) {
|
||||||
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
int8_t* pIndicator = (int8_t*)p->pData;
|
int8_t* pIndicator = (int8_t*)p->pData;
|
||||||
if (status == FILTER_RESULT_ALL_QUALIFIED) {
|
if (status == FILTER_RESULT_ALL_QUALIFIED) {
|
||||||
// here nothing needs to be done
|
// here nothing needs to be done
|
||||||
} else if (status == FILTER_RESULT_NONE_QUALIFIED) {
|
} else if (status == FILTER_RESULT_NONE_QUALIFIED) {
|
||||||
trimDataBlock(pBlock, pBlock->info.rows, NULL);
|
code = trimDataBlock(pBlock, pBlock->info.rows, NULL);
|
||||||
pBlock->info.rows = 0;
|
pBlock->info.rows = 0;
|
||||||
} else if (status == FILTER_RESULT_PARTIAL_QUALIFIED) {
|
} else if (status == FILTER_RESULT_PARTIAL_QUALIFIED) {
|
||||||
trimDataBlock(pBlock, pBlock->info.rows, (bool*)pIndicator);
|
code = trimDataBlock(pBlock, pBlock->info.rows, (bool*)pIndicator);
|
||||||
} else {
|
} else {
|
||||||
qError("unknown filter result type: %d", status);
|
qError("unknown filter result type: %d", status);
|
||||||
}
|
}
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
void doUpdateNumOfRows(SqlFunctionCtx* pCtx, SResultRow* pRow, int32_t numOfExprs, const int32_t* rowEntryOffset) {
|
void doUpdateNumOfRows(SqlFunctionCtx* pCtx, SResultRow* pRow, int32_t numOfExprs, const int32_t* rowEntryOffset) {
|
||||||
|
@ -639,7 +642,7 @@ void copyResultrowToDataBlock(SExprInfo* pExprInfo, int32_t numOfExprs, SResultR
|
||||||
pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowEntryOffset);
|
pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowEntryOffset);
|
||||||
if (pCtx[j].fpSet.finalize) {
|
if (pCtx[j].fpSet.finalize) {
|
||||||
if (strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_group_key") == 0 ||
|
if (strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_group_key") == 0 ||
|
||||||
strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_group_const_value") == 0) {
|
strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_group_const_value") == 0) {
|
||||||
// for groupkey along with functions that output multiple lines(e.g. Histogram)
|
// for groupkey along with functions that output multiple lines(e.g. Histogram)
|
||||||
// need to match groupkey result for each output row of that function.
|
// need to match groupkey result for each output row of that function.
|
||||||
if (pCtx[j].resultInfo->numOfRes != 0) {
|
if (pCtx[j].resultInfo->numOfRes != 0) {
|
||||||
|
@ -678,7 +681,7 @@ _end:
|
||||||
|
|
||||||
// todo refactor. SResultRow has direct pointer in miainfo
|
// todo refactor. SResultRow has direct pointer in miainfo
|
||||||
void finalizeResultRows(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPosition, SExprSupp* pSup,
|
void finalizeResultRows(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPosition, SExprSupp* pSup,
|
||||||
SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo) {
|
SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo) {
|
||||||
SFilePage* page = getBufPage(pBuf, resultRowPosition->pageId);
|
SFilePage* page = getBufPage(pBuf, resultRowPosition->pageId);
|
||||||
if (page == NULL) {
|
if (page == NULL) {
|
||||||
qError("failed to get buffer, code:%s, %s", tstrerror(terrno), GET_TASKID(pTaskInfo));
|
qError("failed to get buffer, code:%s, %s", tstrerror(terrno), GET_TASKID(pTaskInfo));
|
||||||
|
@ -694,7 +697,7 @@ void finalizeResultRows(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPositi
|
||||||
doUpdateNumOfRows(pCtx, pRow, pSup->numOfExprs, rowEntryOffset);
|
doUpdateNumOfRows(pCtx, pRow, pSup->numOfExprs, rowEntryOffset);
|
||||||
if (pRow->numOfRows == 0) {
|
if (pRow->numOfRows == 0) {
|
||||||
releaseBufPage(pBuf, page);
|
releaseBufPage(pBuf, page);
|
||||||
return ;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t size = pBlock->info.capacity;
|
int32_t size = pBlock->info.capacity;
|
||||||
|
|
|
@ -308,7 +308,10 @@ int32_t mJoinFilterAndMarkHashRows(SSDataBlock* pBlock, SFilterInfo* pFilterInfo
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
extractQualifiedTupleByFilterResult(pBlock, p, status);
|
code = extractQualifiedTupleByFilterResult(pBlock, p, status);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
|
||||||
code = TSDB_CODE_SUCCESS;
|
code = TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
|
@ -375,7 +378,10 @@ int32_t mJoinFilterAndMarkRows(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SM
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
extractQualifiedTupleByFilterResult(pBlock, p, status);
|
code = extractQualifiedTupleByFilterResult(pBlock, p, status);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
goto _return;
|
||||||
|
}
|
||||||
|
|
||||||
code = TSDB_CODE_SUCCESS;
|
code = TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
|
|
|
@ -1311,7 +1311,8 @@ static void destroyTableScanOperatorInfo(void* param) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* readHandle,
|
int32_t createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* readHandle,
|
||||||
STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
|
STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo,
|
||||||
|
SOperatorInfo** pOptrInfo) {
|
||||||
QRY_OPTR_CHECK(pOptrInfo);
|
QRY_OPTR_CHECK(pOptrInfo);
|
||||||
|
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
@ -1404,7 +1405,7 @@ _error:
|
||||||
int32_t createTableSeqScanOperatorInfo(void* pReadHandle, SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
|
int32_t createTableSeqScanOperatorInfo(void* pReadHandle, SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
|
||||||
QRY_OPTR_CHECK(pOptrInfo);
|
QRY_OPTR_CHECK(pOptrInfo);
|
||||||
|
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo));
|
STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo));
|
||||||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||||
if (pInfo == NULL || pOperator == NULL) {
|
if (pInfo == NULL || pOperator == NULL) {
|
||||||
|
@ -1422,7 +1423,7 @@ int32_t createTableSeqScanOperatorInfo(void* pReadHandle, SExecTaskInfo* pTaskIn
|
||||||
*pOptrInfo = pOperator;
|
*pOptrInfo = pOperator;
|
||||||
return code;
|
return code;
|
||||||
|
|
||||||
_end:
|
_end:
|
||||||
if (pInfo != NULL) {
|
if (pInfo != NULL) {
|
||||||
taosMemoryFree(pInfo);
|
taosMemoryFree(pInfo);
|
||||||
}
|
}
|
||||||
|
@ -2413,10 +2414,14 @@ _end:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void doBlockDataWindowFilter(SSDataBlock* pBlock, int32_t tsIndex, STimeWindow* pWindow, const char* id) {
|
static int32_t doBlockDataWindowFilter(SSDataBlock* pBlock, int32_t tsIndex, STimeWindow* pWindow, const char* id) {
|
||||||
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
int32_t lino = 0;
|
||||||
|
bool* p = NULL;
|
||||||
if (pWindow->skey != INT64_MIN || pWindow->ekey != INT64_MAX) {
|
if (pWindow->skey != INT64_MIN || pWindow->ekey != INT64_MAX) {
|
||||||
bool* p = taosMemoryCalloc(pBlock->info.rows, sizeof(bool));
|
p = taosMemoryCalloc(pBlock->info.rows, sizeof(bool));
|
||||||
bool hasUnqualified = false;
|
QUERY_CHECK_NULL(p, code, lino, _end, terrno);
|
||||||
|
bool hasUnqualified = false;
|
||||||
|
|
||||||
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, tsIndex);
|
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, tsIndex);
|
||||||
|
|
||||||
|
@ -2445,19 +2450,28 @@ static void doBlockDataWindowFilter(SSDataBlock* pBlock, int32_t tsIndex, STimeW
|
||||||
}
|
}
|
||||||
|
|
||||||
if (hasUnqualified) {
|
if (hasUnqualified) {
|
||||||
trimDataBlock(pBlock, pBlock->info.rows, p);
|
code = trimDataBlock(pBlock, pBlock->info.rows, p);
|
||||||
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
}
|
}
|
||||||
|
|
||||||
taosMemoryFree(p);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
_end:
|
||||||
|
taosMemoryFree(p);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||||
|
}
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void doBlockDataPrimaryKeyFilter(SSDataBlock* pBlock, STqOffsetVal* offset) {
|
static int32_t doBlockDataPrimaryKeyFilter(SSDataBlock* pBlock, STqOffsetVal* offset) {
|
||||||
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
int32_t lino = 0;
|
||||||
if (pBlock->info.window.skey != offset->ts || offset->primaryKey.type == 0) {
|
if (pBlock->info.window.skey != offset->ts || offset->primaryKey.type == 0) {
|
||||||
return;
|
return code;
|
||||||
}
|
}
|
||||||
bool* p = taosMemoryCalloc(pBlock->info.rows, sizeof(bool));
|
bool* p = taosMemoryCalloc(pBlock->info.rows, sizeof(bool));
|
||||||
bool hasUnqualified = false;
|
QUERY_CHECK_NULL(p, code, lino, _end, terrno);
|
||||||
|
bool hasUnqualified = false;
|
||||||
|
|
||||||
SColumnInfoData* pColTs = taosArrayGet(pBlock->pDataBlock, 0);
|
SColumnInfoData* pColTs = taosArrayGet(pBlock->pDataBlock, 0);
|
||||||
SColumnInfoData* pColPk = taosArrayGet(pBlock->pDataBlock, 1);
|
SColumnInfoData* pColPk = taosArrayGet(pBlock->pDataBlock, 1);
|
||||||
|
@ -2485,16 +2499,26 @@ static void doBlockDataPrimaryKeyFilter(SSDataBlock* pBlock, STqOffsetVal* offse
|
||||||
}
|
}
|
||||||
|
|
||||||
if (hasUnqualified) {
|
if (hasUnqualified) {
|
||||||
trimDataBlock(pBlock, pBlock->info.rows, p);
|
code = trimDataBlock(pBlock, pBlock->info.rows, p);
|
||||||
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
_end:
|
||||||
taosMemoryFree(p);
|
taosMemoryFree(p);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||||
|
}
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
// re-build the delete block, ONLY according to the split timestamp
|
// re-build the delete block, ONLY according to the split timestamp
|
||||||
static void rebuildDeleteBlockData(SSDataBlock* pBlock, STimeWindow* pWindow, const char* id) {
|
static int32_t rebuildDeleteBlockData(SSDataBlock* pBlock, STimeWindow* pWindow, const char* id) {
|
||||||
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
int32_t lino = 0;
|
||||||
int32_t numOfRows = pBlock->info.rows;
|
int32_t numOfRows = pBlock->info.rows;
|
||||||
bool* p = taosMemoryCalloc(numOfRows, sizeof(bool));
|
bool* p = taosMemoryCalloc(numOfRows, sizeof(bool));
|
||||||
|
QUERY_CHECK_NULL(p, code, lino, _end, terrno);
|
||||||
|
|
||||||
bool hasUnqualified = false;
|
bool hasUnqualified = false;
|
||||||
int64_t skey = pWindow->skey;
|
int64_t skey = pWindow->skey;
|
||||||
int64_t ekey = pWindow->ekey;
|
int64_t ekey = pWindow->ekey;
|
||||||
|
@ -2531,14 +2555,20 @@ static void rebuildDeleteBlockData(SSDataBlock* pBlock, STimeWindow* pWindow, co
|
||||||
}
|
}
|
||||||
|
|
||||||
if (hasUnqualified) {
|
if (hasUnqualified) {
|
||||||
trimDataBlock(pBlock, pBlock->info.rows, p);
|
code = trimDataBlock(pBlock, pBlock->info.rows, p);
|
||||||
qDebug("%s re-build delete datablock, start key revised to:%" PRId64 ", rows:%" PRId64, id, skey,
|
qDebug("%s re-build delete datablock, start key revised to:%" PRId64 ", rows:%" PRId64, id, skey,
|
||||||
pBlock->info.rows);
|
pBlock->info.rows);
|
||||||
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
} else {
|
} else {
|
||||||
qDebug("%s not update the delete block", id);
|
qDebug("%s not update the delete block", id);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
_end:
|
||||||
taosMemoryFree(p);
|
taosMemoryFree(p);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||||
|
}
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t colIdComparFn(const void* param1, const void* param2) {
|
static int32_t colIdComparFn(const void* param1, const void* param2) {
|
||||||
|
@ -2657,7 +2687,8 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock
|
||||||
}
|
}
|
||||||
|
|
||||||
// filter the block extracted from WAL files, according to the time window apply additional time window filter
|
// filter the block extracted from WAL files, according to the time window apply additional time window filter
|
||||||
doBlockDataWindowFilter(pInfo->pRes, pInfo->primaryTsIndex, pTimeWindow, id);
|
code = doBlockDataWindowFilter(pInfo->pRes, pInfo->primaryTsIndex, pTimeWindow, id);
|
||||||
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
pInfo->pRes->info.dataLoad = 1;
|
pInfo->pRes->info.dataLoad = 1;
|
||||||
|
|
||||||
code = blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex);
|
code = blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex);
|
||||||
|
@ -2676,14 +2707,19 @@ _end:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void processPrimaryKey(SSDataBlock* pBlock, bool hasPrimaryKey, STqOffsetVal* offset) {
|
static int32_t processPrimaryKey(SSDataBlock* pBlock, bool hasPrimaryKey, STqOffsetVal* offset) {
|
||||||
SValue val = {0};
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
SValue val = {0};
|
||||||
if (hasPrimaryKey) {
|
if (hasPrimaryKey) {
|
||||||
doBlockDataPrimaryKeyFilter(pBlock, offset);
|
code = doBlockDataPrimaryKeyFilter(pBlock, offset);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
|
||||||
|
return code;
|
||||||
|
}
|
||||||
SColumnInfoData* pColPk = taosArrayGet(pBlock->pDataBlock, 1);
|
SColumnInfoData* pColPk = taosArrayGet(pBlock->pDataBlock, 1);
|
||||||
|
|
||||||
if (pBlock->info.rows < 1) {
|
if (pBlock->info.rows < 1) {
|
||||||
return;
|
return code;
|
||||||
}
|
}
|
||||||
void* tmp = colDataGetData(pColPk, pBlock->info.rows - 1);
|
void* tmp = colDataGetData(pColPk, pBlock->info.rows - 1);
|
||||||
val.type = pColPk->info.type;
|
val.type = pColPk->info.type;
|
||||||
|
@ -2696,6 +2732,7 @@ static void processPrimaryKey(SSDataBlock* pBlock, bool hasPrimaryKey, STqOffset
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
tqOffsetResetToData(offset, pBlock->info.id.uid, pBlock->info.window.ekey, val);
|
tqOffsetResetToData(offset, pBlock->info.id.uid, pBlock->info.window.ekey, val);
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t doQueueScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
|
static int32_t doQueueScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
|
||||||
|
@ -2711,7 +2748,7 @@ static int32_t doQueueScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
|
||||||
|
|
||||||
if (isTaskKilled(pTaskInfo)) {
|
if (isTaskKilled(pTaskInfo)) {
|
||||||
(*ppRes) = NULL;
|
(*ppRes) = NULL;
|
||||||
return code;
|
return pTaskInfo->code;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pTaskInfo->streamInfo.currentOffset.type == TMQ_OFFSET__SNAPSHOT_DATA) {
|
if (pTaskInfo->streamInfo.currentOffset.type == TMQ_OFFSET__SNAPSHOT_DATA) {
|
||||||
|
@ -2720,9 +2757,9 @@ static int32_t doQueueScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
|
||||||
|
|
||||||
if (pResult && pResult->info.rows > 0) {
|
if (pResult && pResult->info.rows > 0) {
|
||||||
bool hasPrimaryKey = pAPI->tqReaderFn.tqGetTablePrimaryKey(pInfo->tqReader);
|
bool hasPrimaryKey = pAPI->tqReaderFn.tqGetTablePrimaryKey(pInfo->tqReader);
|
||||||
processPrimaryKey(pResult, hasPrimaryKey, &pTaskInfo->streamInfo.currentOffset);
|
code = processPrimaryKey(pResult, hasPrimaryKey, &pTaskInfo->streamInfo.currentOffset);
|
||||||
qDebug("tmqsnap doQueueScan get data uid:%" PRId64 "", pResult->info.id.uid);
|
qDebug("tmqsnap doQueueScan get data utid:%" PRId64 "", pResult->info.id.uid);
|
||||||
if (pResult->info.rows > 0) {
|
if (pResult->info.rows > 0 || code != TSDB_CODE_SUCCESS) {
|
||||||
(*ppRes) = pResult;
|
(*ppRes) = pResult;
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -3156,7 +3193,8 @@ FETCH_NEXT_BLOCK:
|
||||||
|
|
||||||
code = setBlockGroupIdByUid(pInfo, pDelBlock);
|
code = setBlockGroupIdByUid(pInfo, pDelBlock);
|
||||||
QUERY_CHECK_CODE(code, lino, _end);
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
rebuildDeleteBlockData(pDelBlock, &pStreamInfo->fillHistoryWindow, id);
|
code = rebuildDeleteBlockData(pDelBlock, &pStreamInfo->fillHistoryWindow, id);
|
||||||
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
printSpecDataBlock(pDelBlock, getStreamOpName(pOperator->operatorType), "delete recv filtered",
|
printSpecDataBlock(pDelBlock, getStreamOpName(pOperator->operatorType), "delete recv filtered",
|
||||||
GET_TASKID(pTaskInfo));
|
GET_TASKID(pTaskInfo));
|
||||||
if (pDelBlock->info.rows == 0) {
|
if (pDelBlock->info.rows == 0) {
|
||||||
|
@ -3479,7 +3517,7 @@ static int32_t doRawScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
|
||||||
|
|
||||||
if (pBlock && pBlock->info.rows > 0) {
|
if (pBlock && pBlock->info.rows > 0) {
|
||||||
bool hasPrimaryKey = pAPI->snapshotFn.taosXGetTablePrimaryKey(pInfo->sContext);
|
bool hasPrimaryKey = pAPI->snapshotFn.taosXGetTablePrimaryKey(pInfo->sContext);
|
||||||
processPrimaryKey(pBlock, hasPrimaryKey, &pTaskInfo->streamInfo.currentOffset);
|
code = processPrimaryKey(pBlock, hasPrimaryKey, &pTaskInfo->streamInfo.currentOffset);
|
||||||
qDebug("tmqsnap doRawScan get data uid:%" PRId64 "", pBlock->info.id.uid);
|
qDebug("tmqsnap doRawScan get data uid:%" PRId64 "", pBlock->info.id.uid);
|
||||||
(*ppRes) = pBlock;
|
(*ppRes) = pBlock;
|
||||||
return code;
|
return code;
|
||||||
|
@ -3493,7 +3531,7 @@ static int32_t doRawScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
|
||||||
tDeleteSchemaWrapper(mtInfo.schema);
|
tDeleteSchemaWrapper(mtInfo.schema);
|
||||||
goto _end;
|
goto _end;
|
||||||
}
|
}
|
||||||
STqOffsetVal offset = {0};
|
STqOffsetVal offset = {0};
|
||||||
if (mtInfo.uid == 0 || pInfo->sContext->withMeta == ONLY_META) { // read snapshot done, change to get data from wal
|
if (mtInfo.uid == 0 || pInfo->sContext->withMeta == ONLY_META) { // read snapshot done, change to get data from wal
|
||||||
qDebug("tmqsnap read snapshot done, change to get data from wal");
|
qDebug("tmqsnap read snapshot done, change to get data from wal");
|
||||||
tqOffsetResetToLog(&offset, pInfo->sContext->snapVersion + 1);
|
tqOffsetResetToLog(&offset, pInfo->sContext->snapVersion + 1);
|
||||||
|
@ -3964,7 +4002,7 @@ int32_t createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode*
|
||||||
QUERY_CHECK_CODE(code, lino, _error);
|
QUERY_CHECK_CODE(code, lino, _error);
|
||||||
|
|
||||||
pInfo->updateWin = (STimeWindow){.skey = INT64_MAX, .ekey = INT64_MAX};
|
pInfo->updateWin = (STimeWindow){.skey = INT64_MAX, .ekey = INT64_MAX};
|
||||||
createSpecialDataBlock(STREAM_CLEAR, &pInfo->pUpdateDataRes);
|
code = createSpecialDataBlock(STREAM_CLEAR, &pInfo->pUpdateDataRes);
|
||||||
QUERY_CHECK_CODE(code, lino, _error);
|
QUERY_CHECK_CODE(code, lino, _error);
|
||||||
|
|
||||||
if (hasPrimaryKeyCol(pInfo)) {
|
if (hasPrimaryKeyCol(pInfo)) {
|
||||||
|
@ -4123,7 +4161,7 @@ static EDealRes tagScanRewriteTagColumn(SNode** pNode, void* pContext) {
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
int32_t lino = 0;
|
int32_t lino = 0;
|
||||||
STagScanFilterContext* pCtx = (STagScanFilterContext*)pContext;
|
STagScanFilterContext* pCtx = (STagScanFilterContext*)pContext;
|
||||||
SColumnNode* pSColumnNode = NULL;
|
SColumnNode* pSColumnNode = NULL;
|
||||||
if (QUERY_NODE_COLUMN == nodeType((*pNode))) {
|
if (QUERY_NODE_COLUMN == nodeType((*pNode))) {
|
||||||
pSColumnNode = *(SColumnNode**)pNode;
|
pSColumnNode = *(SColumnNode**)pNode;
|
||||||
} else if (QUERY_NODE_FUNCTION == nodeType((*pNode))) {
|
} else if (QUERY_NODE_FUNCTION == nodeType((*pNode))) {
|
||||||
|
@ -4484,8 +4522,8 @@ static void destroyTagScanOperatorInfo(void* param) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysiNode* pTagScanNode,
|
int32_t createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysiNode* pTagScanNode,
|
||||||
STableListInfo* pTableListInfo, SNode* pTagCond, SNode* pTagIndexCond,
|
STableListInfo* pTableListInfo, SNode* pTagCond, SNode* pTagIndexCond,
|
||||||
SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
|
SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
|
||||||
QRY_OPTR_CHECK(pOptrInfo);
|
QRY_OPTR_CHECK(pOptrInfo);
|
||||||
|
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
@ -5083,7 +5121,7 @@ _end:
|
||||||
|
|
||||||
static SSDataBlock* doTableMergeScanParaSubTables(SOperatorInfo* pOperator) {
|
static SSDataBlock* doTableMergeScanParaSubTables(SOperatorInfo* pOperator) {
|
||||||
SSDataBlock* pRes = NULL;
|
SSDataBlock* pRes = NULL;
|
||||||
int32_t code = doTableMergeScanParaSubTablesNext(pOperator, &pRes);
|
int32_t code = doTableMergeScanParaSubTablesNext(pOperator, &pRes);
|
||||||
return pRes;
|
return pRes;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -5317,7 +5355,7 @@ int32_t startDurationForGroupTableMergeScan(SOperatorInfo* pOperator) {
|
||||||
|
|
||||||
pInfo->pSortHandle = NULL;
|
pInfo->pSortHandle = NULL;
|
||||||
code = tsortCreateSortHandle(pInfo->pSortInfo, SORT_BLOCK_TS_MERGE, pInfo->bufPageSize, numOfBufPage,
|
code = tsortCreateSortHandle(pInfo->pSortInfo, SORT_BLOCK_TS_MERGE, pInfo->bufPageSize, numOfBufPage,
|
||||||
pInfo->pSortInputBlock, pTaskInfo->id.str, 0, 0, 0, &pInfo->pSortHandle);
|
pInfo->pSortInputBlock, pTaskInfo->id.str, 0, 0, 0, &pInfo->pSortHandle);
|
||||||
if (code) {
|
if (code) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -5569,7 +5607,7 @@ _end:
|
||||||
|
|
||||||
static SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) {
|
static SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) {
|
||||||
SSDataBlock* pRes = NULL;
|
SSDataBlock* pRes = NULL;
|
||||||
int32_t code = doTableMergeScanNext(pOperator, &pRes);
|
int32_t code = doTableMergeScanNext(pOperator, &pRes);
|
||||||
return pRes;
|
return pRes;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -5626,10 +5664,11 @@ int32_t getTableMergeScanExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExpla
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* readHandle,
|
int32_t createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* readHandle,
|
||||||
STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
|
STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo,
|
||||||
|
SOperatorInfo** pOptrInfo) {
|
||||||
QRY_OPTR_CHECK(pOptrInfo);
|
QRY_OPTR_CHECK(pOptrInfo);
|
||||||
|
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
STableMergeScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableMergeScanInfo));
|
STableMergeScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableMergeScanInfo));
|
||||||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||||
if (pInfo == NULL || pOperator == NULL) {
|
if (pInfo == NULL || pOperator == NULL) {
|
||||||
|
@ -5641,7 +5680,7 @@ int32_t createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SR
|
||||||
|
|
||||||
int32_t numOfCols = 0;
|
int32_t numOfCols = 0;
|
||||||
code = extractColMatchInfo(pTableScanNode->scan.pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID,
|
code = extractColMatchInfo(pTableScanNode->scan.pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID,
|
||||||
&pInfo->base.matchInfo);
|
&pInfo->base.matchInfo);
|
||||||
int32_t lino = 0;
|
int32_t lino = 0;
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
goto _error;
|
goto _error;
|
||||||
|
@ -5867,7 +5906,7 @@ int32_t getTableCountScanSupp(SNodeList* groupTags, SName* tableName, SNodeList*
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t createTableCountScanOperatorInfo(SReadHandle* readHandle, STableCountScanPhysiNode* pTblCountScanNode,
|
int32_t createTableCountScanOperatorInfo(SReadHandle* readHandle, STableCountScanPhysiNode* pTblCountScanNode,
|
||||||
SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
|
SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
|
||||||
QRY_OPTR_CHECK(pOptrInfo);
|
QRY_OPTR_CHECK(pOptrInfo);
|
||||||
|
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
@ -6040,7 +6079,7 @@ _end:
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t doTableCountScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
|
static int32_t doTableCountScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
STableCountScanOperatorInfo* pInfo = pOperator->info;
|
STableCountScanOperatorInfo* pInfo = pOperator->info;
|
||||||
STableCountScanSupp* pSupp = &pInfo->supp;
|
STableCountScanSupp* pSupp = &pInfo->supp;
|
||||||
|
@ -6062,7 +6101,7 @@ static int32_t doTableCountScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRe
|
||||||
|
|
||||||
static SSDataBlock* doTableCountScan(SOperatorInfo* pOperator) {
|
static SSDataBlock* doTableCountScan(SOperatorInfo* pOperator) {
|
||||||
SSDataBlock* pRes = NULL;
|
SSDataBlock* pRes = NULL;
|
||||||
int32_t code = doTableCountScanNext(pOperator, &pRes);
|
int32_t code = doTableCountScanNext(pOperator, &pRes);
|
||||||
return pRes;
|
return pRes;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -510,18 +510,16 @@ int32_t doStreamEventDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOpera
|
||||||
int32_t mapSize = 0;
|
int32_t mapSize = 0;
|
||||||
buf = taosDecodeFixedI32(buf, &mapSize);
|
buf = taosDecodeFixedI32(buf, &mapSize);
|
||||||
for (int32_t i = 0; i < mapSize; i++) {
|
for (int32_t i = 0; i < mapSize; i++) {
|
||||||
SSessionKey key = {0};
|
|
||||||
SResultWindowInfo winfo = {0};
|
SResultWindowInfo winfo = {0};
|
||||||
buf = decodeSSessionKey(buf, &key);
|
buf = decodeSSessionKey(buf, &winfo.sessionWin);
|
||||||
int32_t winCode = TSDB_CODE_SUCCESS;
|
int32_t winCode = TSDB_CODE_SUCCESS;
|
||||||
code = pAggSup->stateStore.streamStateSessionAddIfNotExist(
|
code = pAggSup->stateStore.streamStateSessionAddIfNotExist(
|
||||||
pAggSup->pState, &winfo.sessionWin, pAggSup->gap, (void**)&winfo.pStatePos, &pAggSup->resultRowSize, &winCode);
|
pAggSup->pState, &winfo.sessionWin, pAggSup->gap, (void**)&winfo.pStatePos, &pAggSup->resultRowSize, &winCode);
|
||||||
QUERY_CHECK_CODE(code, lino, _end);
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
ASSERT(winCode == TSDB_CODE_SUCCESS);
|
|
||||||
|
|
||||||
buf = decodeSResultWindowInfo(buf, &winfo, pInfo->streamAggSup.resultRowSize);
|
buf = decodeSResultWindowInfo(buf, &winfo, pInfo->streamAggSup.resultRowSize);
|
||||||
code =
|
code =
|
||||||
tSimpleHashPut(pInfo->streamAggSup.pResultRows, &key, sizeof(SSessionKey), &winfo, sizeof(SResultWindowInfo));
|
tSimpleHashPut(pInfo->streamAggSup.pResultRows, &winfo.sessionWin, sizeof(SSessionKey), &winfo, sizeof(SResultWindowInfo));
|
||||||
QUERY_CHECK_CODE(code, lino, _end);
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -5183,7 +5183,7 @@ int32_t createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
|
||||||
pInfo->pDelWins = taosArrayInit(4, sizeof(SWinKey));
|
pInfo->pDelWins = taosArrayInit(4, sizeof(SWinKey));
|
||||||
pInfo->delIndex = 0;
|
pInfo->delIndex = 0;
|
||||||
|
|
||||||
createSpecialDataBlock(STREAM_DELETE_RESULT, &pInfo->pDelRes);
|
code = createSpecialDataBlock(STREAM_DELETE_RESULT, &pInfo->pDelRes);
|
||||||
QUERY_CHECK_CODE(code, lino, _error);
|
QUERY_CHECK_CODE(code, lino, _error);
|
||||||
|
|
||||||
initResultRowInfo(&pInfo->binfo.resultRowInfo);
|
initResultRowInfo(&pInfo->binfo.resultRowInfo);
|
||||||
|
|
|
@ -189,11 +189,11 @@ run tsim/stream/checkTaskStatus.sim
|
||||||
sql insert into t2 values(1648791213001,1,1,3,1.0,1);
|
sql insert into t2 values(1648791213001,1,1,3,1.0,1);
|
||||||
sql insert into t2 values(1648791213002,2,2,6,3.4,2);
|
sql insert into t2 values(1648791213002,2,2,6,3.4,2);
|
||||||
sql insert into t2 values(1648791213003,4,9,3,4.8,3);
|
sql insert into t2 values(1648791213003,4,9,3,4.8,3);
|
||||||
sql insert into t2 values(1648791233003,3,4,3,2.1,4);
|
|
||||||
sql insert into t2 values(1648791233004,3,5,3,3.4,5);
|
|
||||||
sql insert into t2 values(1648791233005,3,6,3,7.6,6);
|
|
||||||
|
|
||||||
#
|
sql insert into t2 values(1648791233003,3,4,3,2.1,4) (1648791233004,3,5,3,3.4,5) (1648791233005,3,6,3,7.6,6);
|
||||||
|
|
||||||
|
sleep 1000
|
||||||
|
|
||||||
sql insert into t2 values(1648791223003,20,7,3,10.1,7);
|
sql insert into t2 values(1648791223003,20,7,3,10.1,7);
|
||||||
|
|
||||||
$loop_count = 0
|
$loop_count = 0
|
||||||
|
|
Loading…
Reference in New Issue