fix: set tablefield of stream block to NULL
This commit is contained in:
parent
6d4d79a4d3
commit
73be7ddbc6
|
@ -626,6 +626,9 @@ int32_t tqExtractDelDataBlock(const void* pData, int32_t len, int64_t ver, void*
|
|||
tmp = taosArrayGet(pDelBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
|
||||
TSDB_CHECK_NULL(tmp, code, line, END, terrno)
|
||||
colDataSetNULL(tmp, i);
|
||||
tmp = taosArrayGet(pDelBlock->pDataBlock, TABLE_NAME_COLUMN_INDEX);
|
||||
TSDB_CHECK_NULL(tmp, code, line, END, terrno)
|
||||
colDataSetNULL(tmp, i);
|
||||
}
|
||||
|
||||
if (type == 0) {
|
||||
|
|
|
@ -2115,6 +2115,7 @@ static int32_t generateSessionScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSr
|
|||
SColumnInfoData* pDestGpCol = taosArrayGet(pDestBlock->pDataBlock, GROUPID_COLUMN_INDEX);
|
||||
SColumnInfoData* pDestCalStartTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
|
||||
SColumnInfoData* pDestCalEndTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
|
||||
SColumnInfoData* pDestTableNameInxCol = taosArrayGet(pDestBlock->pDataBlock, TABLE_NAME_COLUMN_INDEX);
|
||||
for (int32_t i = 0; i < pSrcBlock->info.rows; i++) {
|
||||
uint64_t groupId = pSrcGp[i];
|
||||
if (groupId == 0) {
|
||||
|
@ -2152,7 +2153,7 @@ static int32_t generateSessionScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSr
|
|||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
||||
colDataSetNULL(pDestCalStartTsCol, i);
|
||||
colDataSetNULL(pDestCalEndTsCol, i);
|
||||
colDataSetNULL(pDestTableNameInxCol, i);
|
||||
pDestBlock->info.rows++;
|
||||
}
|
||||
|
||||
|
@ -2205,6 +2206,7 @@ static int32_t generateCountScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcB
|
|||
SColumnInfoData* pDestGpCol = taosArrayGet(pDestBlock->pDataBlock, GROUPID_COLUMN_INDEX);
|
||||
SColumnInfoData* pDestCalStartTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
|
||||
SColumnInfoData* pDestCalEndTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
|
||||
SColumnInfoData* pDestTableNameInxCol = taosArrayGet(pDestBlock->pDataBlock, TABLE_NAME_COLUMN_INDEX);
|
||||
for (int32_t i = 0; i < pSrcBlock->info.rows; i++) {
|
||||
uint64_t groupId = pSrcGp[i];
|
||||
if (groupId == 0) {
|
||||
|
@ -2233,6 +2235,8 @@ static int32_t generateCountScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcB
|
|||
code = colDataSetVal(pDestCalEndTsCol, i, (const char*)&range.win.ekey, false);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
||||
colDataSetNULL(pDestTableNameInxCol, i);
|
||||
|
||||
pDestBlock->info.rows++;
|
||||
}
|
||||
|
||||
|
@ -2287,6 +2291,7 @@ static int32_t generateIntervalScanRange(SStreamScanInfo* pInfo, SSDataBlock* pS
|
|||
SColumnInfoData* pGpCol = taosArrayGet(pDestBlock->pDataBlock, GROUPID_COLUMN_INDEX);
|
||||
SColumnInfoData* pCalStartTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
|
||||
SColumnInfoData* pCalEndTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
|
||||
SColumnInfoData* pDestTableNameInxCol = taosArrayGet(pDestBlock->pDataBlock, TABLE_NAME_COLUMN_INDEX);
|
||||
for (int32_t i = 0; i < pSrcBlock->info.rows;) {
|
||||
uint64_t srcUid = srcUidData[i];
|
||||
uint64_t groupId = srcGp[i];
|
||||
|
@ -2319,6 +2324,8 @@ static int32_t generateIntervalScanRange(SStreamScanInfo* pInfo, SSDataBlock* pS
|
|||
code = colDataSetVal(pGpCol, pDestBlock->info.rows, (const char*)(&groupId), false);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
||||
colDataSetNULL(pDestTableNameInxCol, pDestBlock->info.rows);
|
||||
|
||||
pDestBlock->info.rows++;
|
||||
}
|
||||
|
||||
|
@ -3074,6 +3081,7 @@ static int32_t filterDelBlockByUid(SSDataBlock* pDst, const SSDataBlock* pSrc, S
|
|||
colDataSetNULL(taosArrayGet(pDst->pDataBlock, GROUPID_COLUMN_INDEX), j);
|
||||
colDataSetNULL(taosArrayGet(pDst->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX), j);
|
||||
colDataSetNULL(taosArrayGet(pDst->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX), j);
|
||||
colDataSetNULL(taosArrayGet(pDst->pDataBlock, TABLE_NAME_COLUMN_INDEX), j);
|
||||
j++;
|
||||
}
|
||||
}
|
||||
|
@ -3670,6 +3678,9 @@ FETCH_NEXT_BLOCK:
|
|||
// printDataBlock(pInfo->pCheckpointRes, "stream scan ck", GET_TASKID(pTaskInfo));
|
||||
(*ppRes) = pInfo->pCheckpointRes;
|
||||
return code;
|
||||
} else {
|
||||
qError("stream scan error, invalid block type %d, %s", pInfo->blockType, id);
|
||||
code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
|
||||
}
|
||||
|
||||
_end:
|
||||
|
|
Loading…
Reference in New Issue