|
|
|
@ -1372,13 +1372,51 @@ static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbU
|
|
|
|
|
return pBlock->info.rows > 0 ? pBlock : NULL;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static uint64_t getGroupIdByCol(SStreamScanInfo* pInfo, uint64_t uid, TSKEY ts, int64_t maxVersion) {
|
|
|
|
|
bool comparePrimaryKey(SColumnInfoData* pCol, int32_t rowId, void* pVal) {
|
|
|
|
|
void* pData = colDataGetData(pCol, rowId);
|
|
|
|
|
if (IS_VAR_DATA_TYPE(pCol->info.type)) {
|
|
|
|
|
int32_t colLen = varDataLen(pData);
|
|
|
|
|
int32_t keyLen = varDataLen(pVal);
|
|
|
|
|
if (pCol->info.type == TSDB_DATA_TYPE_JSON) {
|
|
|
|
|
colLen = getJsonValueLen(pData);
|
|
|
|
|
keyLen = getJsonValueLen(pVal);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (colLen == keyLen && memcmp(pData, pVal, colLen) == 0) {
|
|
|
|
|
return true;
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
if (memcmp(pData, pVal, pCol->info.bytes) == 0) {
|
|
|
|
|
return true;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
bool hasPrimaryKey(SStreamScanInfo* pInfo) {
|
|
|
|
|
return pInfo->primaryKeyIndex != -1;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static uint64_t getGroupIdByCol(SStreamScanInfo* pInfo, uint64_t uid, TSKEY ts, int64_t maxVersion, void* pVal) {
|
|
|
|
|
SSDataBlock* pPreRes = readPreVersionData(pInfo->pTableScanOp, uid, ts, ts, maxVersion);
|
|
|
|
|
if (!pPreRes || pPreRes->info.rows == 0) {
|
|
|
|
|
return 0;
|
|
|
|
|
}
|
|
|
|
|
ASSERT(pPreRes->info.rows == 1);
|
|
|
|
|
return calGroupIdByData(&pInfo->partitionSup, pInfo->pPartScalarSup, pPreRes, 0);
|
|
|
|
|
|
|
|
|
|
int32_t rowId = 0;
|
|
|
|
|
if (hasPrimaryKey(pInfo)) {
|
|
|
|
|
SColumnInfoData* pPkCol = taosArrayGet(pPreRes->pDataBlock, pInfo->primaryKeyIndex);
|
|
|
|
|
for (; rowId < pPreRes->info.rows; rowId++) {
|
|
|
|
|
if (comparePrimaryKey(pPkCol, rowId, pVal)) {
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
if (rowId >= pPreRes->info.rows) {
|
|
|
|
|
qInfo("===stream===read preversion data of primary key failed. ts:%" PRId64 ",version:%" PRId64);
|
|
|
|
|
return 0;
|
|
|
|
|
}
|
|
|
|
|
return calGroupIdByData(&pInfo->partitionSup, pInfo->pPartScalarSup, pPreRes, rowId);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static uint64_t getGroupIdByUid(SStreamScanInfo* pInfo, uint64_t uid) {
|
|
|
|
@ -1386,9 +1424,9 @@ static uint64_t getGroupIdByUid(SStreamScanInfo* pInfo, uint64_t uid) {
|
|
|
|
|
return tableListGetTableGroupId(pTableScanInfo->base.pTableListInfo, uid);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static uint64_t getGroupIdByData(SStreamScanInfo* pInfo, uint64_t uid, TSKEY ts, int64_t maxVersion) {
|
|
|
|
|
static uint64_t getGroupIdByData(SStreamScanInfo* pInfo, uint64_t uid, TSKEY ts, int64_t maxVersion, void* pVal) {
|
|
|
|
|
if (pInfo->partitionSup.needCalc) {
|
|
|
|
|
return getGroupIdByCol(pInfo, uid, ts, maxVersion);
|
|
|
|
|
return getGroupIdByCol(pInfo, uid, ts, maxVersion, pVal);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return getGroupIdByUid(pInfo, uid);
|
|
|
|
@ -1573,6 +1611,7 @@ static int32_t generateSessionScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSr
|
|
|
|
|
TSKEY* endData = (TSKEY*)pEndTsCol->pData;
|
|
|
|
|
SColumnInfoData* pUidCol = taosArrayGet(pSrcBlock->pDataBlock, UID_COLUMN_INDEX);
|
|
|
|
|
uint64_t* uidCol = (uint64_t*)pUidCol->pData;
|
|
|
|
|
SColumnInfoData* pSrcPkCol = taosArrayGet(pSrcBlock->pDataBlock, PRIMARY_KEY_COLUMN_INDEX);
|
|
|
|
|
|
|
|
|
|
SColumnInfoData* pDestStartCol = taosArrayGet(pDestBlock->pDataBlock, START_TS_COLUMN_INDEX);
|
|
|
|
|
SColumnInfoData* pDestEndCol = taosArrayGet(pDestBlock->pDataBlock, END_TS_COLUMN_INDEX);
|
|
|
|
@ -1582,7 +1621,11 @@ static int32_t generateSessionScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSr
|
|
|
|
|
SColumnInfoData* pDestCalEndTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
|
|
|
|
|
int64_t ver = pSrcBlock->info.version - 1;
|
|
|
|
|
for (int32_t i = 0; i < pSrcBlock->info.rows; i++) {
|
|
|
|
|
uint64_t groupId = getGroupIdByData(pInfo, uidCol[i], startData[i], ver);
|
|
|
|
|
void* pVal = NULL;
|
|
|
|
|
if (hasPrimaryKey(pInfo)) {
|
|
|
|
|
pVal = colDataGetData(pSrcPkCol, i);
|
|
|
|
|
}
|
|
|
|
|
uint64_t groupId = getGroupIdByData(pInfo, uidCol[i], startData[i], ver, pVal);
|
|
|
|
|
// gap must be 0.
|
|
|
|
|
SSessionKey startWin = {0};
|
|
|
|
|
getCurSessionWindow(pInfo->windowSup.pStreamAggSup, startData[i], startData[i], groupId, &startWin);
|
|
|
|
@ -1628,6 +1671,7 @@ static int32_t generateCountScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcB
|
|
|
|
|
TSKEY* endData = (TSKEY*)pEndTsCol->pData;
|
|
|
|
|
SColumnInfoData* pUidCol = taosArrayGet(pSrcBlock->pDataBlock, UID_COLUMN_INDEX);
|
|
|
|
|
uint64_t* uidCol = (uint64_t*)pUidCol->pData;
|
|
|
|
|
SColumnInfoData* pSrcPkCol = taosArrayGet(pSrcBlock->pDataBlock, PRIMARY_KEY_COLUMN_INDEX);
|
|
|
|
|
|
|
|
|
|
SColumnInfoData* pDestStartCol = taosArrayGet(pDestBlock->pDataBlock, START_TS_COLUMN_INDEX);
|
|
|
|
|
SColumnInfoData* pDestEndCol = taosArrayGet(pDestBlock->pDataBlock, END_TS_COLUMN_INDEX);
|
|
|
|
@ -1637,7 +1681,11 @@ static int32_t generateCountScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcB
|
|
|
|
|
SColumnInfoData* pDestCalEndTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
|
|
|
|
|
int64_t ver = pSrcBlock->info.version - 1;
|
|
|
|
|
for (int32_t i = 0; i < pSrcBlock->info.rows; i++) {
|
|
|
|
|
uint64_t groupId = getGroupIdByData(pInfo, uidCol[i], startData[i], ver);
|
|
|
|
|
void* pVal = NULL;
|
|
|
|
|
if (hasPrimaryKey(pInfo)) {
|
|
|
|
|
pVal = colDataGetData(pSrcPkCol, i);
|
|
|
|
|
}
|
|
|
|
|
uint64_t groupId = getGroupIdByData(pInfo, uidCol[i], startData[i], ver, pVal);
|
|
|
|
|
SSessionKey startWin = {.win.skey = startData[i], .win.ekey = endData[i], .groupId = groupId};
|
|
|
|
|
SSessionKey range = {0};
|
|
|
|
|
getCountWinRange(pInfo->windowSup.pStreamAggSup, &startWin, mode, &range);
|
|
|
|
@ -1664,6 +1712,7 @@ static int32_t generateIntervalScanRange(SStreamScanInfo* pInfo, SSDataBlock* pS
|
|
|
|
|
SColumnInfoData* pSrcEndTsCol = (SColumnInfoData*)taosArrayGet(pSrcBlock->pDataBlock, END_TS_COLUMN_INDEX);
|
|
|
|
|
SColumnInfoData* pSrcUidCol = taosArrayGet(pSrcBlock->pDataBlock, UID_COLUMN_INDEX);
|
|
|
|
|
SColumnInfoData* pSrcGpCol = taosArrayGet(pSrcBlock->pDataBlock, GROUPID_COLUMN_INDEX);
|
|
|
|
|
SColumnInfoData* pSrcPkCol = taosArrayGet(pSrcBlock->pDataBlock, PRIMARY_KEY_COLUMN_INDEX);
|
|
|
|
|
|
|
|
|
|
uint64_t* srcUidData = (uint64_t*)pSrcUidCol->pData;
|
|
|
|
|
ASSERT(pSrcStartTsCol->info.type == TSDB_DATA_TYPE_TIMESTAMP);
|
|
|
|
@ -1688,7 +1737,7 @@ static int32_t generateIntervalScanRange(SStreamScanInfo* pInfo, SSDataBlock* pS
|
|
|
|
|
|
|
|
|
|
for (int32_t i = 0; i < rows; i++) {
|
|
|
|
|
uint64_t groupId = calGroupIdByData(&pInfo->partitionSup, pInfo->pPartScalarSup, pPreRes, i);
|
|
|
|
|
appendOneRowToStreamSpecialBlock(pSrcBlock, ((TSKEY*)pTsCol->pData) + i, ((TSKEY*)pTsCol->pData) + i, &srcUid,
|
|
|
|
|
appendDataToSpecialBlock(pSrcBlock, ((TSKEY*)pTsCol->pData) + i, ((TSKEY*)pTsCol->pData) + i, &srcUid,
|
|
|
|
|
&groupId, NULL);
|
|
|
|
|
}
|
|
|
|
|
printDataBlock(pSrcBlock, "new delete", GET_TASKID(pTaskInfo));
|
|
|
|
@ -1713,7 +1762,11 @@ static int32_t generateIntervalScanRange(SStreamScanInfo* pInfo, SSDataBlock* pS
|
|
|
|
|
uint64_t srcUid = srcUidData[i];
|
|
|
|
|
uint64_t groupId = srcGp[i];
|
|
|
|
|
if (groupId == 0) {
|
|
|
|
|
groupId = getGroupIdByData(pInfo, srcUid, srcStartTsCol[i], ver);
|
|
|
|
|
void* pVal = NULL;
|
|
|
|
|
if (hasPrimaryKey(pInfo)) {
|
|
|
|
|
pVal = colDataGetData(pSrcPkCol, i);
|
|
|
|
|
}
|
|
|
|
|
groupId = getGroupIdByData(pInfo, srcUid, srcStartTsCol[i], ver, pVal);
|
|
|
|
|
}
|
|
|
|
|
TSKEY calStartTs = srcStartTsCol[i];
|
|
|
|
|
colDataSetVal(pCalStartTsCol, pDestBlock->info.rows, (const char*)(&calStartTs), false);
|
|
|
|
@ -1758,6 +1811,8 @@ static int32_t generateDeleteResultBlock(SStreamScanInfo* pInfo, SSDataBlock* pS
|
|
|
|
|
uint64_t* srcUidData = (uint64_t*)pSrcUidCol->pData;
|
|
|
|
|
SColumnInfoData* pSrcGpCol = taosArrayGet(pSrcBlock->pDataBlock, GROUPID_COLUMN_INDEX);
|
|
|
|
|
uint64_t* srcGp = (uint64_t*)pSrcGpCol->pData;
|
|
|
|
|
SColumnInfoData* pSrcPkCol = taosArrayGet(pSrcBlock->pDataBlock, PRIMARY_KEY_COLUMN_INDEX);
|
|
|
|
|
|
|
|
|
|
ASSERT(pSrcStartTsCol->info.type == TSDB_DATA_TYPE_TIMESTAMP);
|
|
|
|
|
TSKEY* srcStartTsCol = (TSKEY*)pSrcStartTsCol->pData;
|
|
|
|
|
TSKEY* srcEndTsCol = (TSKEY*)pSrcEndTsCol->pData;
|
|
|
|
@ -1767,12 +1822,17 @@ static int32_t generateDeleteResultBlock(SStreamScanInfo* pInfo, SSDataBlock* pS
|
|
|
|
|
uint64_t groupId = srcGp[i];
|
|
|
|
|
char tbname[VARSTR_HEADER_SIZE + TSDB_TABLE_NAME_LEN] = {0};
|
|
|
|
|
if (groupId == 0) {
|
|
|
|
|
groupId = getGroupIdByData(pInfo, srcUid, srcStartTsCol[i], ver);
|
|
|
|
|
void* pVal = NULL;
|
|
|
|
|
if (hasPrimaryKey(pInfo) && pSrcPkCol) {
|
|
|
|
|
pVal = colDataGetData(pSrcPkCol, i);
|
|
|
|
|
}
|
|
|
|
|
groupId = getGroupIdByData(pInfo, srcUid, srcStartTsCol[i], ver, pVal);
|
|
|
|
|
}
|
|
|
|
|
if (pInfo->tbnameCalSup.pExprInfo) {
|
|
|
|
|
void* parTbname = NULL;
|
|
|
|
|
code = pInfo->stateStore.streamStateGetParName(pInfo->pStreamScanOp->pTaskInfo->streamInfo.pState, groupId, &parTbname);
|
|
|
|
|
if (code != TSDB_CODE_SUCCESS) {
|
|
|
|
|
// todo(liuyao) 这里可能需要修改,需要考虑复合主键时,pPreRes包含多行数据。
|
|
|
|
|
SSDataBlock* pPreRes = readPreVersionData(pInfo->pTableScanOp, srcUid, srcStartTsCol[i], srcStartTsCol[i], ver);
|
|
|
|
|
printDataBlock(pPreRes, "pre res", GET_TASKID(pInfo->pStreamScanOp->pTaskInfo));
|
|
|
|
|
calBlockTbName(pInfo, pPreRes);
|
|
|
|
@ -1783,7 +1843,7 @@ static int32_t generateDeleteResultBlock(SStreamScanInfo* pInfo, SSDataBlock* pS
|
|
|
|
|
varDataSetLen(tbname, strlen(varDataVal(tbname)));
|
|
|
|
|
pInfo->stateStore.streamStateFreeVal(parTbname);
|
|
|
|
|
}
|
|
|
|
|
appendOneRowToStreamSpecialBlock(pDestBlock, srcStartTsCol + i, srcEndTsCol + i, srcUidData + i, &groupId,
|
|
|
|
|
appendDataToSpecialBlock(pDestBlock, srcStartTsCol + i, srcEndTsCol + i, srcUidData + i, &groupId,
|
|
|
|
|
tbname[0] == 0 ? NULL : tbname);
|
|
|
|
|
}
|
|
|
|
|
return TSDB_CODE_SUCCESS;
|
|
|
|
@ -1807,13 +1867,8 @@ static int32_t generateScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock,
|
|
|
|
|
return code;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void appendOneRowToStreamSpecialBlock(SSDataBlock* pBlock, TSKEY* pStartTs, TSKEY* pEndTs, uint64_t* pUid,
|
|
|
|
|
uint64_t* pGp, void* pTbName) {
|
|
|
|
|
appendAllColumnToStreamSpecialBlock(pBlock, pStartTs, pEndTs, pStartTs, pEndTs, pUid, pGp, pTbName);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void appendAllColumnToStreamSpecialBlock(SSDataBlock* pBlock, TSKEY* pStartTs, TSKEY* pEndTs, TSKEY* pCalStartTs,
|
|
|
|
|
TSKEY* pCalEndTs, uint64_t* pUid, uint64_t* pGp, void* pTbName) {
|
|
|
|
|
void appendOneRowToSpecialBlockImpl(SSDataBlock* pBlock, TSKEY* pStartTs, TSKEY* pEndTs, TSKEY* pCalStartTs,
|
|
|
|
|
TSKEY* pCalEndTs, uint64_t* pUid, uint64_t* pGp, void* pTbName, void* pPkData) {
|
|
|
|
|
SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
|
|
|
|
|
SColumnInfoData* pEndTsCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
|
|
|
|
|
SColumnInfoData* pUidCol = taosArrayGet(pBlock->pDataBlock, UID_COLUMN_INDEX);
|
|
|
|
@ -1828,9 +1883,24 @@ void appendAllColumnToStreamSpecialBlock(SSDataBlock* pBlock, TSKEY* pStartTs, T
|
|
|
|
|
colDataSetVal(pCalStartCol, pBlock->info.rows, (const char*)pCalStartTs, false);
|
|
|
|
|
colDataSetVal(pCalEndCol, pBlock->info.rows, (const char*)pCalEndTs, false);
|
|
|
|
|
colDataSetVal(pTableCol, pBlock->info.rows, (const char*)pTbName, pTbName == NULL);
|
|
|
|
|
if (taosArrayGetSize(pBlock->pDataBlock) > PRIMARY_KEY_COLUMN_INDEX) {
|
|
|
|
|
SColumnInfoData* pPkCol = taosArrayGet(pBlock->pDataBlock, PRIMARY_KEY_COLUMN_INDEX);
|
|
|
|
|
colDataSetVal(pPkCol, pBlock->info.rows, (const char*)pPkData, pPkData == NULL);
|
|
|
|
|
}
|
|
|
|
|
pBlock->info.rows++;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void appendPkToSpecialBlock(SSDataBlock* pBlock, TSKEY* pTsArray, SColumnInfoData* pPkCol, int32_t rowId,
|
|
|
|
|
uint64_t* pUid, uint64_t* pGp, void* pTbName) {
|
|
|
|
|
appendOneRowToSpecialBlockImpl(pBlock, pTsArray + rowId, pTsArray + rowId, pTsArray + rowId, pTsArray + rowId, pUid,
|
|
|
|
|
pGp, pTbName, colDataGetData(pPkCol, rowId));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void appendDataToSpecialBlock(SSDataBlock* pBlock, TSKEY* pStartTs, TSKEY* pEndTs, uint64_t* pUid, uint64_t* pGp,
|
|
|
|
|
void* pTbName) {
|
|
|
|
|
appendOneRowToSpecialBlockImpl(pBlock, pStartTs, pEndTs, pStartTs, pEndTs, pUid, pGp, pTbName, NULL);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
bool checkExpiredData(SStateStore* pAPI, SUpdateInfo* pUpdateInfo, STimeWindowAggSupp* pTwSup, uint64_t tableId, TSKEY ts) {
|
|
|
|
|
bool isExpired = false;
|
|
|
|
|
bool isInc = pAPI->isIncrementalTimeStamp(pUpdateInfo, tableId, ts);
|
|
|
|
@ -1848,6 +1918,11 @@ static void checkUpdateData(SStreamScanInfo* pInfo, bool invertible, SSDataBlock
|
|
|
|
|
SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex);
|
|
|
|
|
ASSERT(pColDataInfo->info.type == TSDB_DATA_TYPE_TIMESTAMP);
|
|
|
|
|
TSKEY* tsCol = (TSKEY*)pColDataInfo->pData;
|
|
|
|
|
SColumnInfoData* pPkColDataInfo = NULL;
|
|
|
|
|
if (hasPrimaryKey(pInfo)) {
|
|
|
|
|
pPkColDataInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryKeyIndex);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
bool tableInserted = pInfo->stateStore.updateInfoIsTableInserted(pInfo->pUpdateInfo, pBlock->info.id.uid);
|
|
|
|
|
for (int32_t rowId = 0; rowId < pBlock->info.rows; rowId++) {
|
|
|
|
|
SResultRowInfo dumyInfo;
|
|
|
|
@ -1865,17 +1940,15 @@ static void checkUpdateData(SStreamScanInfo* pInfo, bool invertible, SSDataBlock
|
|
|
|
|
}
|
|
|
|
|
// must check update info first.
|
|
|
|
|
bool update = pInfo->stateStore.updateInfoIsUpdated(pInfo->pUpdateInfo, pBlock->info.id.uid, tsCol[rowId]);
|
|
|
|
|
bool closedWin = isClosed && isSignleIntervalWindow(pInfo) &&
|
|
|
|
|
bool isDeleted = isClosed && isSignleIntervalWindow(pInfo) &&
|
|
|
|
|
isDeletedStreamWindow(&win, pBlock->info.id.groupId, pInfo->pState, &pInfo->twAggSup, &pInfo->stateStore);
|
|
|
|
|
if ((update || closedWin) && out) {
|
|
|
|
|
qDebug("stream update check not pass, update %d, closedWin %d", update, closedWin);
|
|
|
|
|
if ((update || isDeleted) && out) {
|
|
|
|
|
qDebug("stream update check not pass, update %d, deleted Win %d", update, isDeleted);
|
|
|
|
|
uint64_t gpId = 0;
|
|
|
|
|
appendOneRowToStreamSpecialBlock(pInfo->pUpdateDataRes, tsCol + rowId, tsCol + rowId, &pBlock->info.id.uid, &gpId,
|
|
|
|
|
NULL);
|
|
|
|
|
if (closedWin && pInfo->partitionSup.needCalc) {
|
|
|
|
|
appendPkToSpecialBlock(pInfo->pUpdateDataRes, tsCol, pPkColDataInfo, rowId, &pBlock->info.id.uid, &gpId, NULL);
|
|
|
|
|
if (isDeleted && pInfo->partitionSup.needCalc) {
|
|
|
|
|
gpId = calGroupIdByData(&pInfo->partitionSup, pInfo->pPartScalarSup, pBlock, rowId);
|
|
|
|
|
appendOneRowToStreamSpecialBlock(pInfo->pUpdateDataRes, tsCol + rowId, tsCol + rowId, &pBlock->info.id.uid,
|
|
|
|
|
&gpId, NULL);
|
|
|
|
|
appendPkToSpecialBlock(pInfo->pUpdateDataRes, tsCol, pPkColDataInfo, rowId, &pBlock->info.id.uid, &gpId, NULL);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
@ -2809,6 +2882,14 @@ void streamScanReloadState(SOperatorInfo* pOperator) {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void addPrimaryKeyCol(SSDataBlock* pBlock, uint8_t type, int32_t bytes) {
|
|
|
|
|
pBlock->info.rowSize += bytes;
|
|
|
|
|
SColumnInfoData infoData = {0};
|
|
|
|
|
infoData.info.type = type;
|
|
|
|
|
infoData.info.bytes = bytes;
|
|
|
|
|
taosArrayPush(pBlock->pDataBlock, &infoData);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* pTableScanNode, SNode* pTagCond,
|
|
|
|
|
STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo) {
|
|
|
|
|
SArray* pColIds = NULL;
|
|
|
|
@ -2837,6 +2918,8 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
|
|
|
|
|
goto _error;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
SDataType pkType = {0};
|
|
|
|
|
pInfo->primaryKeyIndex = -1;
|
|
|
|
|
int32_t numOfOutput = taosArrayGetSize(pInfo->matchInfo.pList);
|
|
|
|
|
pColIds = taosArrayInit(numOfOutput, sizeof(int16_t));
|
|
|
|
|
for (int32_t i = 0; i < numOfOutput; ++i) {
|
|
|
|
@ -2847,6 +2930,10 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
|
|
|
|
|
if (id->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
|
|
|
|
|
pInfo->primaryTsIndex = id->dstSlotId;
|
|
|
|
|
}
|
|
|
|
|
if (id->isPk) {
|
|
|
|
|
pInfo->primaryKeyIndex = id->dstSlotId;
|
|
|
|
|
pkType = id->dataType;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (pTableScanNode->pSubtable != NULL) {
|
|
|
|
@ -2964,6 +3051,9 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
|
|
|
|
|
pInfo->pDeleteDataRes = createSpecialDataBlock(STREAM_DELETE_DATA);
|
|
|
|
|
pInfo->updateWin = (STimeWindow){.skey = INT64_MAX, .ekey = INT64_MAX};
|
|
|
|
|
pInfo->pUpdateDataRes = createSpecialDataBlock(STREAM_CLEAR);
|
|
|
|
|
if (hasPrimaryKey(pInfo)) {
|
|
|
|
|
addPrimaryKeyCol(pInfo->pUpdateDataRes, pkType.type, pkType.bytes);
|
|
|
|
|
}
|
|
|
|
|
pInfo->assignBlockUid = pTableScanNode->assignBlockUid;
|
|
|
|
|
pInfo->partitionSup.needCalc = false;
|
|
|
|
|
pInfo->igCheckUpdate = pTableScanNode->igCheckUpdate;
|
|
|
|
|