Merge pull request #18741 from taosdata/fix/TD-20929
fix:read data and compute group id
This commit is contained in:
commit
2eb496e9d0
|
@ -1218,27 +1218,56 @@ static int32_t generateIntervalScanRange(SStreamScanInfo* pInfo, SSDataBlock* pS
|
||||||
if (rows == 0) {
|
if (rows == 0) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
SColumnInfoData* pSrcStartTsCol = (SColumnInfoData*)taosArrayGet(pSrcBlock->pDataBlock, START_TS_COLUMN_INDEX);
|
||||||
|
SColumnInfoData* pSrcEndTsCol = (SColumnInfoData*)taosArrayGet(pSrcBlock->pDataBlock, END_TS_COLUMN_INDEX);
|
||||||
|
SColumnInfoData* pSrcUidCol = taosArrayGet(pSrcBlock->pDataBlock, UID_COLUMN_INDEX);
|
||||||
|
SColumnInfoData* pSrcGpCol = taosArrayGet(pSrcBlock->pDataBlock, GROUPID_COLUMN_INDEX);
|
||||||
|
|
||||||
|
uint64_t* srcUidData = (uint64_t*)pSrcUidCol->pData;
|
||||||
|
ASSERT(pSrcStartTsCol->info.type == TSDB_DATA_TYPE_TIMESTAMP);
|
||||||
|
TSKEY* srcStartTsCol = (TSKEY*)pSrcStartTsCol->pData;
|
||||||
|
TSKEY* srcEndTsCol = (TSKEY*)pSrcEndTsCol->pData;
|
||||||
|
int64_t version = pSrcBlock->info.version - 1;
|
||||||
|
|
||||||
|
if (pInfo->partitionSup.needCalc && srcStartTsCol[0] != srcEndTsCol[0]) {
|
||||||
|
uint64_t srcUid = srcUidData[0];
|
||||||
|
TSKEY startTs = srcStartTsCol[0];
|
||||||
|
TSKEY endTs = srcEndTsCol[0];
|
||||||
|
SSDataBlock* pPreRes = readPreVersionData(pInfo->pTableScanOp, srcUid, startTs, endTs, version);
|
||||||
|
printDataBlock(pPreRes, "pre res");
|
||||||
|
blockDataCleanup(pSrcBlock);
|
||||||
|
int32_t code = blockDataEnsureCapacity(pSrcBlock, pPreRes->info.rows);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
SColumnInfoData* pTsCol = (SColumnInfoData*)taosArrayGet(pPreRes->pDataBlock, pInfo->primaryTsIndex);
|
||||||
|
rows = pPreRes->info.rows;
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < rows; i++) {
|
||||||
|
uint64_t groupId = calGroupIdByData(&pInfo->partitionSup, pInfo->pPartScalarSup, pPreRes, i);
|
||||||
|
appendOneRowToStreamSpecialBlock(pSrcBlock, ((TSKEY*)pTsCol->pData) + i, ((TSKEY*)pTsCol->pData) + i, &srcUid,
|
||||||
|
&groupId, NULL);
|
||||||
|
}
|
||||||
|
printDataBlock(pSrcBlock, "new delete");
|
||||||
|
}
|
||||||
|
uint64_t* srcGp = (uint64_t*)pSrcGpCol->pData;
|
||||||
|
srcStartTsCol = (TSKEY*)pSrcStartTsCol->pData;
|
||||||
|
srcEndTsCol = (TSKEY*)pSrcEndTsCol->pData;
|
||||||
|
srcUidData = (uint64_t*)pSrcUidCol->pData;
|
||||||
|
|
||||||
int32_t code = blockDataEnsureCapacity(pDestBlock, rows);
|
int32_t code = blockDataEnsureCapacity(pDestBlock, rows);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
SColumnInfoData* pSrcStartTsCol = (SColumnInfoData*)taosArrayGet(pSrcBlock->pDataBlock, START_TS_COLUMN_INDEX);
|
|
||||||
SColumnInfoData* pSrcEndTsCol = (SColumnInfoData*)taosArrayGet(pSrcBlock->pDataBlock, END_TS_COLUMN_INDEX);
|
|
||||||
SColumnInfoData* pSrcUidCol = taosArrayGet(pSrcBlock->pDataBlock, UID_COLUMN_INDEX);
|
|
||||||
uint64_t* srcUidData = (uint64_t*)pSrcUidCol->pData;
|
|
||||||
SColumnInfoData* pSrcGpCol = taosArrayGet(pSrcBlock->pDataBlock, GROUPID_COLUMN_INDEX);
|
|
||||||
uint64_t* srcGp = (uint64_t*)pSrcGpCol->pData;
|
|
||||||
ASSERT(pSrcStartTsCol->info.type == TSDB_DATA_TYPE_TIMESTAMP);
|
|
||||||
TSKEY* srcStartTsCol = (TSKEY*)pSrcStartTsCol->pData;
|
|
||||||
TSKEY* srcEndTsCol = (TSKEY*)pSrcEndTsCol->pData;
|
|
||||||
SColumnInfoData* pStartTsCol = taosArrayGet(pDestBlock->pDataBlock, START_TS_COLUMN_INDEX);
|
SColumnInfoData* pStartTsCol = taosArrayGet(pDestBlock->pDataBlock, START_TS_COLUMN_INDEX);
|
||||||
SColumnInfoData* pEndTsCol = taosArrayGet(pDestBlock->pDataBlock, END_TS_COLUMN_INDEX);
|
SColumnInfoData* pEndTsCol = taosArrayGet(pDestBlock->pDataBlock, END_TS_COLUMN_INDEX);
|
||||||
SColumnInfoData* pDeUidCol = taosArrayGet(pDestBlock->pDataBlock, UID_COLUMN_INDEX);
|
SColumnInfoData* pDeUidCol = taosArrayGet(pDestBlock->pDataBlock, UID_COLUMN_INDEX);
|
||||||
SColumnInfoData* pGpCol = taosArrayGet(pDestBlock->pDataBlock, GROUPID_COLUMN_INDEX);
|
SColumnInfoData* pGpCol = taosArrayGet(pDestBlock->pDataBlock, GROUPID_COLUMN_INDEX);
|
||||||
SColumnInfoData* pCalStartTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
|
SColumnInfoData* pCalStartTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
|
||||||
SColumnInfoData* pCalEndTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
|
SColumnInfoData* pCalEndTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
|
||||||
int64_t version = pSrcBlock->info.version - 1;
|
|
||||||
for (int32_t i = 0; i < rows;) {
|
for (int32_t i = 0; i < rows;) {
|
||||||
uint64_t srcUid = srcUidData[i];
|
uint64_t srcUid = srcUidData[i];
|
||||||
uint64_t groupId = srcGp[i];
|
uint64_t groupId = srcGp[i];
|
||||||
|
@ -1653,13 +1682,6 @@ static void setBlockGroupIdByUid(SStreamScanInfo* pInfo, SSDataBlock* pBlock) {
|
||||||
uint64_t groupId = getGroupIdByUid(pInfo, uidCol[i]);
|
uint64_t groupId = getGroupIdByUid(pInfo, uidCol[i]);
|
||||||
colDataAppend(pGpCol, i, (const char*)&groupId, false);
|
colDataAppend(pGpCol, i, (const char*)&groupId, false);
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
// SSDataBlock* pPreRes = readPreVersionData(pInfo->pTableScanOp, uidCol[i], startTsCol, ts, maxVersion);
|
|
||||||
// if (!pPreRes || pPreRes->info.rows == 0) {
|
|
||||||
// return 0;
|
|
||||||
// }
|
|
||||||
// ASSERT(pPreRes->info.rows == 1);
|
|
||||||
// return calGroupIdByData(&pInfo->partitionSup, pInfo->pPartScalarSup, pPreRes, 0);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue