Merge pull request #17786 from taosdata/feature/TD-19975
fix(stream): scalar stream update data
This commit is contained in:
commit
bfdecae3bd
|
@ -62,7 +62,7 @@ static int32_t initGroupOptrInfo(SArray** pGroupColVals, int32_t* keyLen, char**
|
|||
|
||||
int32_t numOfGroupCols = taosArrayGetSize(pGroupColList);
|
||||
for (int32_t i = 0; i < numOfGroupCols; ++i) {
|
||||
SColumn* pCol = (SColumn*) taosArrayGet(pGroupColList, i);
|
||||
SColumn* pCol = (SColumn*)taosArrayGet(pGroupColList, i);
|
||||
(*keyLen) += pCol->bytes; // actual data + null_flag
|
||||
|
||||
SGroupKeys key = {0};
|
||||
|
@ -397,7 +397,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) {
|
|||
return buildGroupResultDataBlock(pOperator);
|
||||
}
|
||||
|
||||
SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode *pAggNode, SExecTaskInfo* pTaskInfo) {
|
||||
SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pAggNode, SExecTaskInfo* pTaskInfo) {
|
||||
SGroupbyOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SGroupbyOperatorInfo));
|
||||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||
if (pInfo == NULL || pOperator == NULL) {
|
||||
|
@ -442,8 +442,8 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode
|
|||
pOperator->info = pInfo;
|
||||
pOperator->pTaskInfo = pTaskInfo;
|
||||
|
||||
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, hashGroupbyAggregate, NULL, NULL,
|
||||
destroyGroupOperatorInfo, NULL);
|
||||
pOperator->fpSet =
|
||||
createOperatorFpSet(operatorDummyOpenFn, hashGroupbyAggregate, NULL, NULL, destroyGroupOperatorInfo, NULL);
|
||||
code = appendDownstream(pOperator, &downstream, 1);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _error;
|
||||
|
@ -765,7 +765,6 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartition
|
|||
goto _error;
|
||||
}
|
||||
|
||||
|
||||
int32_t numOfCols = 0;
|
||||
SExprInfo* pExprInfo = createExprInfo(pPartNode->pTargets, NULL, &numOfCols);
|
||||
pInfo->pGroupCols = extractPartitionColInfo(pPartNode->pPartitionKeys);
|
||||
|
@ -819,8 +818,8 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartition
|
|||
pOperator->info = pInfo;
|
||||
pOperator->pTaskInfo = pTaskInfo;
|
||||
|
||||
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, hashPartition, NULL, NULL, destroyPartitionOperatorInfo,
|
||||
NULL);
|
||||
pOperator->fpSet =
|
||||
createOperatorFpSet(operatorDummyOpenFn, hashPartition, NULL, NULL, destroyPartitionOperatorInfo, NULL);
|
||||
|
||||
code = appendDownstream(pOperator, &downstream, 1);
|
||||
return pOperator;
|
||||
|
@ -965,6 +964,7 @@ static SSDataBlock* doStreamHashPartition(SOperatorInfo* pOperator) {
|
|||
case STREAM_DELETE_DATA: {
|
||||
copyDataBlock(pInfo->pDelRes, pBlock);
|
||||
pInfo->pDelRes->info.type = STREAM_DELETE_RESULT;
|
||||
printDataBlock(pInfo->pDelRes, "stream partitionby delete");
|
||||
return pInfo->pDelRes;
|
||||
} break;
|
||||
default:
|
||||
|
@ -1014,6 +1014,9 @@ void initParDownStream(SOperatorInfo* downstream, SPartitionBySupporter* pParSup
|
|||
SStreamScanInfo* pScanInfo = downstream->info;
|
||||
pScanInfo->partitionSup = *pParSup;
|
||||
pScanInfo->pPartScalarSup = pExpr;
|
||||
if (!pScanInfo->pUpdateInfo) {
|
||||
pScanInfo->pUpdateInfo = updateInfoInit(60000, TSDB_TIME_PRECISION_MILLI, 0);
|
||||
}
|
||||
}
|
||||
|
||||
SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStreamPartitionPhysiNode* pPartNode,
|
||||
|
@ -1108,7 +1111,6 @@ _error:
|
|||
return NULL;
|
||||
}
|
||||
|
||||
|
||||
SArray* extractColumnInfo(SNodeList* pNodeList) {
|
||||
size_t numOfCols = LIST_LENGTH(pNodeList);
|
||||
SArray* pList = taosArrayInit(numOfCols, sizeof(SColumn));
|
||||
|
|
|
@ -363,7 +363,8 @@ void applyLimitOffset(SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SExecTaskInfo
|
|||
if (pLimitInfo->remainOffset >= pBlock->info.rows) {
|
||||
pLimitInfo->remainOffset -= pBlock->info.rows;
|
||||
pBlock->info.rows = 0;
|
||||
qDebug("current block ignore due to offset, current:%"PRId64", %s", pLimitInfo->remainOffset, GET_TASKID(pTaskInfo));
|
||||
qDebug("current block ignore due to offset, current:%" PRId64 ", %s", pLimitInfo->remainOffset,
|
||||
GET_TASKID(pTaskInfo));
|
||||
} else {
|
||||
blockDataTrimFirstNRows(pBlock, pLimitInfo->remainOffset);
|
||||
pLimitInfo->remainOffset = 0;
|
||||
|
@ -376,7 +377,7 @@ void applyLimitOffset(SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SExecTaskInfo
|
|||
int32_t keep = pBlock->info.rows - overflowRows;
|
||||
|
||||
blockDataKeepFirstNRows(pBlock, keep);
|
||||
qDebug("output limit %"PRId64" has reached, %s", pLimit->limit, GET_TASKID(pTaskInfo));
|
||||
qDebug("output limit %" PRId64 " has reached, %s", pLimit->limit, GET_TASKID(pTaskInfo));
|
||||
pOperator->status = OP_EXEC_DONE;
|
||||
}
|
||||
}
|
||||
|
@ -748,13 +749,13 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
|
|||
return NULL;
|
||||
}
|
||||
|
||||
int32_t num = 0;
|
||||
int32_t num = 0;
|
||||
STableKeyInfo* pList = NULL;
|
||||
tableListGetGroupList(pTaskInfo->pTableInfoList, pInfo->currentGroupId, &pList, &num);
|
||||
ASSERT(pInfo->dataReader == NULL);
|
||||
|
||||
int32_t code = tsdbReaderOpen(pInfo->readHandle.vnode, &pInfo->cond, pList, num, (STsdbReader**)&pInfo->dataReader,
|
||||
GET_TASKID(pTaskInfo));
|
||||
int32_t code = tsdbReaderOpen(pInfo->readHandle.vnode, &pInfo->cond, pList, num,
|
||||
(STsdbReader**)&pInfo->dataReader, GET_TASKID(pTaskInfo));
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
T_LONG_JMP(pTaskInfo->env, code);
|
||||
}
|
||||
|
@ -776,7 +777,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
|
|||
pInfo->limitInfo.numOfOutputRows = 0;
|
||||
pInfo->limitInfo.remainOffset = pInfo->limitInfo.limit.offset;
|
||||
|
||||
int32_t num = 0;
|
||||
int32_t num = 0;
|
||||
STableKeyInfo* pList = NULL;
|
||||
tableListGetGroupList(pTaskInfo->pTableInfoList, pInfo->currentGroupId, &pList, &num);
|
||||
|
||||
|
@ -819,8 +820,8 @@ static void destroyTableScanOperatorInfo(void* param) {
|
|||
taosMemoryFreeClear(param);
|
||||
}
|
||||
|
||||
SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* readHandle,
|
||||
SExecTaskInfo* pTaskInfo) {
|
||||
SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* readHandle,
|
||||
SExecTaskInfo* pTaskInfo) {
|
||||
STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo));
|
||||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||
if (pInfo == NULL || pOperator == NULL) {
|
||||
|
@ -884,7 +885,7 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode,
|
|||
pOperator->cost.openCost = 0;
|
||||
return pOperator;
|
||||
|
||||
_error:
|
||||
_error:
|
||||
if (pInfo != NULL) {
|
||||
destroyTableScanOperatorInfo(pInfo);
|
||||
}
|
||||
|
@ -1024,7 +1025,8 @@ static int32_t initTableblockDistQueryCond(uint64_t uid, SQueryTableDataCond* pC
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
SOperatorInfo* createDataBlockInfoScanOperator(SReadHandle* readHandle, SBlockDistScanPhysiNode* pBlockScanNode, SExecTaskInfo* pTaskInfo) {
|
||||
SOperatorInfo* createDataBlockInfoScanOperator(SReadHandle* readHandle, SBlockDistScanPhysiNode* pBlockScanNode,
|
||||
SExecTaskInfo* pTaskInfo) {
|
||||
SBlockDistInfo* pInfo = taosMemoryCalloc(1, sizeof(SBlockDistInfo));
|
||||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||
if (pInfo == NULL || pOperator == NULL) {
|
||||
|
@ -1041,8 +1043,8 @@ SOperatorInfo* createDataBlockInfoScanOperator(SReadHandle* readHandle, SBlockDi
|
|||
}
|
||||
|
||||
STableListInfo* pTableListInfo = pTaskInfo->pTableInfoList;
|
||||
size_t num = tableListGetSize(pTableListInfo);
|
||||
void* pList = tableListGetInfo(pTableListInfo, 0);
|
||||
size_t num = tableListGetSize(pTableListInfo);
|
||||
void* pList = tableListGetInfo(pTableListInfo, 0);
|
||||
|
||||
tsdbReaderOpen(readHandle->vnode, &cond, pList, num, &pInfo->pHandle, pTaskInfo->id.str);
|
||||
cleanupQueryTableDataCond(&cond);
|
||||
|
@ -1070,7 +1072,7 @@ SOperatorInfo* createDataBlockInfoScanOperator(SReadHandle* readHandle, SBlockDi
|
|||
createOperatorFpSet(operatorDummyOpenFn, doBlockInfoScan, NULL, NULL, destroyBlockDistScanOperatorInfo, NULL);
|
||||
return pOperator;
|
||||
|
||||
_error:
|
||||
_error:
|
||||
taosMemoryFreeClear(pInfo);
|
||||
taosMemoryFreeClear(pOperator);
|
||||
return NULL;
|
||||
|
@ -1135,8 +1137,8 @@ static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbU
|
|||
blockDataCleanup(pBlock);
|
||||
|
||||
STsdbReader* pReader = NULL;
|
||||
int32_t code = tsdbReaderOpen(pTableScanInfo->readHandle.vnode, &cond, &tblInfo, 1, (STsdbReader**)&pReader,
|
||||
GET_TASKID(pTaskInfo));
|
||||
int32_t code = tsdbReaderOpen(pTableScanInfo->readHandle.vnode, &cond, &tblInfo, 1, (STsdbReader**)&pReader,
|
||||
GET_TASKID(pTaskInfo));
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
terrno = code;
|
||||
return NULL;
|
||||
|
@ -1162,7 +1164,8 @@ static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbU
|
|||
|
||||
tsdbReaderClose(pReader);
|
||||
qDebug("retrieve prev rows:%d, skey:%" PRId64 ", ekey:%" PRId64 " uid:%" PRIu64 ", max ver:%" PRId64
|
||||
", suid:%" PRIu64, pBlock->info.rows, startTs, endTs, tbUid, maxVersion, cond.suid);
|
||||
", suid:%" PRIu64,
|
||||
pBlock->info.rows, startTs, endTs, tbUid, maxVersion, cond.suid);
|
||||
|
||||
return pBlock->info.rows > 0 ? pBlock : NULL;
|
||||
}
|
||||
|
@ -1178,12 +1181,12 @@ static uint64_t getGroupIdByCol(SStreamScanInfo* pInfo, uint64_t uid, TSKEY ts,
|
|||
|
||||
static uint64_t getGroupIdByUid(SStreamScanInfo* pInfo, uint64_t uid) {
|
||||
return getTableGroupId(pInfo->pTableScanOp->pTaskInfo->pTableInfoList, uid);
|
||||
// SHashObj* map = pInfo->pTableScanOp->pTaskInfo->pTableInfoList.map;
|
||||
// uint64_t* groupId = taosHashGet(map, &uid, sizeof(int64_t));
|
||||
// if (groupId) {
|
||||
// return *groupId;
|
||||
// }
|
||||
// return 0;
|
||||
// SHashObj* map = pInfo->pTableScanOp->pTaskInfo->pTableInfoList.map;
|
||||
// uint64_t* groupId = taosHashGet(map, &uid, sizeof(int64_t));
|
||||
// if (groupId) {
|
||||
// return *groupId;
|
||||
// }
|
||||
// return 0;
|
||||
}
|
||||
|
||||
static uint64_t getGroupIdByData(SStreamScanInfo* pInfo, uint64_t uid, TSKEY ts, int64_t maxVersion) {
|
||||
|
@ -1380,7 +1383,7 @@ static int32_t generateIntervalScanRange(SStreamScanInfo* pInfo, SSDataBlock* pS
|
|||
if (rows == 0) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
int32_t code = blockDataEnsureCapacity(pDestBlock, rows * 2);
|
||||
int32_t code = blockDataEnsureCapacity(pDestBlock, rows);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
|
@ -1423,39 +1426,33 @@ static int32_t generateIntervalScanRange(SStreamScanInfo* pInfo, SSDataBlock* pS
|
|||
}
|
||||
|
||||
static int32_t generateDeleteResultBlock(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pDestBlock) {
|
||||
if (pSrcBlock->info.rows == 0) {
|
||||
blockDataCleanup(pDestBlock);
|
||||
int32_t rows = pSrcBlock->info.rows;
|
||||
if (rows == 0) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
blockDataCleanup(pDestBlock);
|
||||
int32_t code = blockDataEnsureCapacity(pDestBlock, pSrcBlock->info.rows);
|
||||
int32_t code = blockDataEnsureCapacity(pDestBlock, rows);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
ASSERT(taosArrayGetSize(pSrcBlock->pDataBlock) >= 3);
|
||||
SColumnInfoData* pStartTsCol = taosArrayGet(pSrcBlock->pDataBlock, START_TS_COLUMN_INDEX);
|
||||
TSKEY* startData = (TSKEY*)pStartTsCol->pData;
|
||||
SColumnInfoData* pEndTsCol = taosArrayGet(pSrcBlock->pDataBlock, END_TS_COLUMN_INDEX);
|
||||
TSKEY* endData = (TSKEY*)pEndTsCol->pData;
|
||||
SColumnInfoData* pUidCol = taosArrayGet(pSrcBlock->pDataBlock, UID_COLUMN_INDEX);
|
||||
uint64_t* uidCol = (uint64_t*)pUidCol->pData;
|
||||
|
||||
SColumnInfoData* pDestStartCol = taosArrayGet(pDestBlock->pDataBlock, START_TS_COLUMN_INDEX);
|
||||
SColumnInfoData* pDestEndCol = taosArrayGet(pDestBlock->pDataBlock, END_TS_COLUMN_INDEX);
|
||||
SColumnInfoData* pDestUidCol = taosArrayGet(pDestBlock->pDataBlock, UID_COLUMN_INDEX);
|
||||
SColumnInfoData* pDestGpCol = taosArrayGet(pDestBlock->pDataBlock, GROUPID_COLUMN_INDEX);
|
||||
SColumnInfoData* pDestCalStartTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
|
||||
SColumnInfoData* pDestCalEndTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
|
||||
int32_t dummy = 0;
|
||||
int64_t version = pSrcBlock->info.version - 1;
|
||||
SColumnInfoData* pSrcStartTsCol = (SColumnInfoData*)taosArrayGet(pSrcBlock->pDataBlock, START_TS_COLUMN_INDEX);
|
||||
SColumnInfoData* pSrcEndTsCol = (SColumnInfoData*)taosArrayGet(pSrcBlock->pDataBlock, END_TS_COLUMN_INDEX);
|
||||
SColumnInfoData* pSrcUidCol = taosArrayGet(pSrcBlock->pDataBlock, UID_COLUMN_INDEX);
|
||||
uint64_t* srcUidData = (uint64_t*)pSrcUidCol->pData;
|
||||
SColumnInfoData* pSrcGpCol = taosArrayGet(pSrcBlock->pDataBlock, GROUPID_COLUMN_INDEX);
|
||||
uint64_t* srcGp = (uint64_t*)pSrcGpCol->pData;
|
||||
ASSERT(pSrcStartTsCol->info.type == TSDB_DATA_TYPE_TIMESTAMP);
|
||||
TSKEY* srcStartTsCol = (TSKEY*)pSrcStartTsCol->pData;
|
||||
TSKEY* srcEndTsCol = (TSKEY*)pSrcEndTsCol->pData;
|
||||
int64_t version = pSrcBlock->info.version - 1;
|
||||
for (int32_t i = 0; i < pSrcBlock->info.rows; i++) {
|
||||
uint64_t groupId = getGroupIdByData(pInfo, uidCol[i], startData[i], version);
|
||||
colDataAppend(pDestStartCol, i, (const char*)(startData + i), false);
|
||||
colDataAppend(pDestEndCol, i, (const char*)(endData + i), false);
|
||||
colDataAppendNULL(pDestUidCol, i);
|
||||
colDataAppend(pDestGpCol, i, (const char*)&groupId, false);
|
||||
colDataAppendNULL(pDestCalStartTsCol, i);
|
||||
colDataAppendNULL(pDestCalEndTsCol, i);
|
||||
pDestBlock->info.rows++;
|
||||
uint64_t srcUid = srcUidData[i];
|
||||
uint64_t groupId = srcGp[i];
|
||||
if (groupId == 0) {
|
||||
groupId = getGroupIdByData(pInfo, srcUid, srcStartTsCol[i], version);
|
||||
}
|
||||
appendOneRowToStreamSpecialBlock(pDestBlock, srcStartTsCol + i, srcEndTsCol + i, srcUidData + i, &groupId, NULL);
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -1466,6 +1463,8 @@ static int32_t generateScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock,
|
|||
code = generateIntervalScanRange(pInfo, pSrcBlock, pDestBlock);
|
||||
} else if (isSessionWindow(pInfo) || isStateWindow(pInfo)) {
|
||||
code = generateSessionScanRange(pInfo, pSrcBlock, pDestBlock);
|
||||
} else {
|
||||
code = generateDeleteResultBlock(pInfo, pSrcBlock, pDestBlock);
|
||||
}
|
||||
pDestBlock->info.type = STREAM_CLEAR;
|
||||
pDestBlock->info.version = pSrcBlock->info.version;
|
||||
|
@ -1894,8 +1893,8 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
|
|||
#endif
|
||||
|
||||
size_t total = taosArrayGetSize(pInfo->pBlockLists);
|
||||
// TODO: refactor
|
||||
FETCH_NEXT_BLOCK:
|
||||
// TODO: refactor
|
||||
FETCH_NEXT_BLOCK:
|
||||
if (pInfo->blockType == STREAM_INPUT__DATA_BLOCK) {
|
||||
if (pInfo->validBlockIndex >= total) {
|
||||
doClearBufferedBlocks(pInfo);
|
||||
|
@ -2022,7 +2021,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
|
|||
|
||||
int32_t totBlockNum = taosArrayGetSize(pInfo->pBlockLists);
|
||||
|
||||
NEXT_SUBMIT_BLK:
|
||||
NEXT_SUBMIT_BLK:
|
||||
while (1) {
|
||||
if (pInfo->tqReader->pMsg == NULL) {
|
||||
if (pInfo->validBlockIndex >= totBlockNum) {
|
||||
|
@ -2278,7 +2277,7 @@ SOperatorInfo* createRawScanOperatorInfo(SReadHandle* pHandle, SExecTaskInfo* pT
|
|||
pOperator->fpSet = createOperatorFpSet(NULL, doRawScan, NULL, NULL, destroyRawScanOperatorInfo, NULL);
|
||||
return pOperator;
|
||||
|
||||
_end:
|
||||
_end:
|
||||
taosMemoryFree(pInfo);
|
||||
taosMemoryFree(pOperator);
|
||||
pTaskInfo->code = code;
|
||||
|
@ -2388,7 +2387,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
|
|||
}
|
||||
|
||||
STableKeyInfo* pList = NULL;
|
||||
int32_t num = 0;
|
||||
int32_t num = 0;
|
||||
tableListGetGroupList(pTaskInfo->pTableInfoList, 0, &pList, &num);
|
||||
|
||||
if (pHandle->initTableReader) {
|
||||
|
@ -2467,7 +2466,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
|
|||
|
||||
return pOperator;
|
||||
|
||||
_error:
|
||||
_error:
|
||||
if (pColIds != NULL) {
|
||||
taosArrayDestroy(pColIds);
|
||||
}
|
||||
|
@ -4102,7 +4101,7 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScan
|
|||
|
||||
return pOperator;
|
||||
|
||||
_error:
|
||||
_error:
|
||||
taosMemoryFreeClear(pInfo);
|
||||
taosMemoryFreeClear(pOperator);
|
||||
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
|
||||
|
@ -4241,7 +4240,7 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi
|
|||
|
||||
return pOperator;
|
||||
|
||||
_error:
|
||||
_error:
|
||||
taosMemoryFree(pInfo);
|
||||
taosMemoryFree(pOperator);
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
@ -4264,8 +4263,8 @@ int32_t createMultipleDataReaders2(SQueryTableDataCond* pQueryCond, SReadHandle*
|
|||
STableListInfo* pTableListInfo, int32_t tableStartIdx, int32_t tableEndIdx,
|
||||
STsdbReader** ppReader, const char* idstr) {
|
||||
STsdbReader* pReader = NULL;
|
||||
void* pStart = tableListGetInfo(pTableListInfo, tableStartIdx);
|
||||
int32_t num = tableEndIdx - tableStartIdx + 1;
|
||||
void* pStart = tableListGetInfo(pTableListInfo, tableStartIdx);
|
||||
int32_t num = tableEndIdx - tableStartIdx + 1;
|
||||
|
||||
int32_t code = tsdbReaderOpen(pHandle->vnode, pQueryCond, pStart, num, &pReader, idstr);
|
||||
if (code != 0) {
|
||||
|
@ -4426,7 +4425,7 @@ static int32_t loadDataBlockFromOneTable(SOperatorInfo* pOperator, STableMergeSc
|
|||
|
||||
bool allColumnsHaveAgg = true;
|
||||
SColumnDataAgg** pColAgg = NULL;
|
||||
// STsdbReader* reader = pTableScanInfo->pReader; // taosArrayGetP(pTableScanInfo->dataReaders, readerIdx);
|
||||
// STsdbReader* reader = pTableScanInfo->pReader; // taosArrayGetP(pTableScanInfo->dataReaders, readerIdx);
|
||||
|
||||
if (allColumnsHaveAgg == true) {
|
||||
int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
|
||||
|
@ -4524,7 +4523,7 @@ static SSDataBlock* getTableDataBlockTemp(void* param) {
|
|||
|
||||
int64_t st = taosGetTimestampUs();
|
||||
|
||||
void* p = tableListGetInfo(pInfo->tableListInfo, readIdx + pInfo->tableStartIndex);
|
||||
void* p = tableListGetInfo(pInfo->tableListInfo, readIdx + pInfo->tableStartIndex);
|
||||
SReadHandle* pHandle = &pInfo->readHandle;
|
||||
tsdbReaderOpen(pHandle->vnode, pQueryCond, p, 1, &pInfo->pReader, GET_TASKID(pTaskInfo));
|
||||
|
||||
|
|
|
@ -1687,7 +1687,9 @@ void initIntervalDownStream(SOperatorInfo* downstream, uint16_t type, SAggSuppor
|
|||
SStreamScanInfo* pScanInfo = downstream->info;
|
||||
pScanInfo->windowSup.parentType = type;
|
||||
pScanInfo->windowSup.pIntervalAggSup = pSup;
|
||||
pScanInfo->pUpdateInfo = updateInfoInitP(pInterval, pTwSup->waterMark);
|
||||
if (!pScanInfo->pUpdateInfo) {
|
||||
pScanInfo->pUpdateInfo = updateInfoInitP(pInterval, pTwSup->waterMark);
|
||||
}
|
||||
pScanInfo->interval = *pInterval;
|
||||
pScanInfo->twAggSup = *pTwSup;
|
||||
}
|
||||
|
@ -2453,7 +2455,6 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
|
|||
} else { // non-linear interpolation
|
||||
pSliceInfo->current =
|
||||
taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision);
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,94 @@
|
|||
$loop_all = 0
|
||||
looptest:
|
||||
|
||||
system sh/stop_dnodes.sh
|
||||
system sh/deploy.sh -n dnode1 -i 1
|
||||
system sh/exec.sh -n dnode1 -s start
|
||||
sleep 50
|
||||
sql connect
|
||||
|
||||
sql drop database if exists test;
|
||||
sql create database test vgroups 1;
|
||||
sql use test;
|
||||
sql create stable st(ts timestamp,a int,b int,c int, d double) tags(ta int,tb int,tc int);
|
||||
sql create table t1 using st tags(1,1,1);
|
||||
sql create table t2 using st tags(2,2,2);
|
||||
sql create stream streams0 into streamt0 as select ts c1, a, abs(b) c4 from t1 partition by a;
|
||||
sql create stream streams1 into streamt1 as select ts c1, a, abs(b) c4 from t1;
|
||||
sql create stream streams2 into streamt2 as select ts c1, a, abs(b) c4 from st partition by tbname;
|
||||
|
||||
sql insert into t1 values(1648791213000,1,1,1,1);
|
||||
sql insert into t1 values(1648791213001,1,1,1,1);
|
||||
sql insert into t1 values(1648791213002,1,1,1,1);
|
||||
|
||||
sql insert into t2 values(1648791213000,1,2,2,2);
|
||||
sql insert into t2 values(1648791213001,1,1,1,1);
|
||||
sql insert into t2 values(1648791213002,1,1,1,1);
|
||||
|
||||
sql insert into t1 values(1648791213001,2,11,11,11);
|
||||
|
||||
|
||||
$loop_count = 0
|
||||
loop1:
|
||||
|
||||
sleep 200
|
||||
|
||||
sql select * from streamt0 order by a desc;
|
||||
|
||||
$loop_count = $loop_count + 1
|
||||
|
||||
if $loop_count == 10 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $rows != 3 then
|
||||
print ======streamt0=rows=$rows
|
||||
goto loop1
|
||||
endi
|
||||
|
||||
if $data01 != 2 then
|
||||
print ======streamt0=data01=$data01
|
||||
goto loop1
|
||||
endi
|
||||
|
||||
if $data02 != 11 then
|
||||
print ======streamt0=data02=$data02
|
||||
goto loop1
|
||||
endi
|
||||
|
||||
|
||||
sql select * from streamt1 order by a desc;
|
||||
|
||||
if $rows != 3 then
|
||||
print ======streamt1=rows=$rows
|
||||
goto loop1
|
||||
endi
|
||||
|
||||
if $data01 != 2 then
|
||||
print ======streamt1=data01=$data01
|
||||
goto loop1
|
||||
endi
|
||||
|
||||
if $data02 != 11 then
|
||||
print ======streamt1=data02=$data02
|
||||
goto loop1
|
||||
endi
|
||||
|
||||
sql select * from streamt2 order by a desc;
|
||||
|
||||
if $rows != 6 then
|
||||
print ======streamt2=rows=$rows
|
||||
goto loop1
|
||||
endi
|
||||
|
||||
if $data01 != 2 then
|
||||
print ======streamt2=data01=$data01
|
||||
goto loop1
|
||||
endi
|
||||
|
||||
if $data02 != 11 then
|
||||
print ======streamt2=data02=$data02
|
||||
goto loop1
|
||||
endi
|
||||
|
||||
system sh/stop_dnodes.sh
|
Loading…
Reference in New Issue