fix(stream): set the null column when extracting data from submit data.
This commit is contained in:
parent
674acd0e9f
commit
9f4f4f7f9f
|
@ -2187,6 +2187,17 @@ static void rebuildDeleteBlockData(SSDataBlock* pBlock, STimeWindow* pWindow, co
|
|||
taosMemoryFree(p);
|
||||
}
|
||||
|
||||
static int32_t colIdComparFn(const void* param1, const void * param2) {
|
||||
int32_t p1 = *(int32_t*) param1;
|
||||
int32_t p2 = *(int32_t*) param2;
|
||||
|
||||
if (p1 == p2) {
|
||||
return 0;
|
||||
} else {
|
||||
return (p1 < p2)? -1:1;
|
||||
}
|
||||
}
|
||||
|
||||
static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock, STimeWindow* pTimeWindow, bool filter) {
|
||||
SDataBlockInfo* pBlockInfo = &pInfo->pRes->info;
|
||||
SOperatorInfo* pOperator = pInfo->pStreamScanOp;
|
||||
|
@ -2203,6 +2214,8 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock
|
|||
STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
|
||||
pBlockInfo->id.groupId = tableListGetTableGroupId(pTableScanInfo->base.pTableListInfo, pBlock->info.id.uid);
|
||||
|
||||
SArray* pColList = taosArrayInit(4, sizeof(int32_t));
|
||||
|
||||
// todo extract method
|
||||
for (int32_t i = 0; i < taosArrayGetSize(pInfo->matchInfo.pList); ++i) {
|
||||
SColMatchItem* pColMatchInfo = taosArrayGet(pInfo->matchInfo.pList, i);
|
||||
|
@ -2217,6 +2230,7 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock
|
|||
SColumnInfoData* pDst = taosArrayGet(pInfo->pRes->pDataBlock, pColMatchInfo->dstSlotId);
|
||||
colDataAssign(pDst, pResCol, pBlock->info.rows, &pInfo->pRes->info);
|
||||
colExists = true;
|
||||
taosArrayPush(pColList, &pColMatchInfo->dstSlotId);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
@ -2225,6 +2239,7 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock
|
|||
if (!colExists) {
|
||||
SColumnInfoData* pDst = taosArrayGet(pInfo->pRes->pDataBlock, pColMatchInfo->dstSlotId);
|
||||
colDataSetNNULL(pDst, 0, pBlockInfo->rows);
|
||||
taosArrayPush(pColList, &pColMatchInfo->dstSlotId);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -2240,7 +2255,34 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock
|
|||
|
||||
// reset the error code.
|
||||
terrno = 0;
|
||||
|
||||
for(int32_t i = 0; i < pInfo->numOfPseudoExpr; ++i) {
|
||||
taosArrayPush(pColList, &pInfo->pPseudoExpr[i].base.resSchema.slotId);
|
||||
}
|
||||
}
|
||||
|
||||
taosArraySort(pColList, colIdComparFn);
|
||||
|
||||
int32_t i = 0, j = 0;
|
||||
while(i < taosArrayGetSize(pColList)) {
|
||||
int32_t slot1 = *(int32_t*)taosArrayGet(pColList, i);
|
||||
if (slot1 > j) {
|
||||
SColumnInfoData* pDst = taosArrayGet(pInfo->pRes->pDataBlock, j);
|
||||
colDataSetNNULL(pDst, 0, pBlockInfo->rows);
|
||||
j += 1;
|
||||
} else {
|
||||
i += 1;
|
||||
j += 1;
|
||||
}
|
||||
}
|
||||
|
||||
while(j < taosArrayGetSize(pInfo->pRes->pDataBlock)) {
|
||||
SColumnInfoData* pDst = taosArrayGet(pInfo->pRes->pDataBlock, j);
|
||||
colDataSetNNULL(pDst, 0, pBlockInfo->rows);
|
||||
j += 1;
|
||||
}
|
||||
|
||||
taosArrayDestroy(pColList);
|
||||
|
||||
if (filter) {
|
||||
doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
|
||||
|
|
Loading…
Reference in New Issue