fix(stream): add null check.
This commit is contained in:
parent
df516703e1
commit
87b04c4eea
|
@ -570,20 +570,35 @@ int32_t doConvertRows(SSubmitTbData* pTableData, const STSchema* pTSchema, SSDat
|
||||||
taosArrayClear(pVals);
|
taosArrayClear(pVals);
|
||||||
|
|
||||||
int32_t dataIndex = 0;
|
int32_t dataIndex = 0;
|
||||||
|
int64_t ts = 0;
|
||||||
|
|
||||||
for (int32_t k = 0; k < pTSchema->numOfCols; k++) {
|
for (int32_t k = 0; k < pTSchema->numOfCols; k++) {
|
||||||
const STColumn* pCol = &pTSchema->columns[k];
|
const STColumn* pCol = &pTSchema->columns[k];
|
||||||
|
|
||||||
|
// primary timestamp column, for debug purpose
|
||||||
if (k == 0) {
|
if (k == 0) {
|
||||||
SColumnInfoData* pColData = taosArrayGet(pDataBlock->pDataBlock, dataIndex);
|
SColumnInfoData* pColData = taosArrayGet(pDataBlock->pDataBlock, dataIndex);
|
||||||
void* colData = colDataGetData(pColData, j);
|
ts = *(int64_t*)colDataGetData(pColData, j);
|
||||||
tqTrace("s-task:%s sink row %d, col %d ts %" PRId64, id, j, k, *(int64_t*)colData);
|
tqTrace("s-task:%s sink row %d, col %d ts %" PRId64, id, j, k, ts);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (IS_SET_NULL(pCol)) {
|
if (IS_SET_NULL(pCol)) {
|
||||||
|
if (pCol->flags & COL_IS_KEY) {
|
||||||
|
qError("ts:%" PRId64 " Primary key column should not be null, colId:%" PRIi16 ", colType:%" PRIi8, ts,
|
||||||
|
pCol->colId, pCol->type);
|
||||||
|
break;
|
||||||
|
}
|
||||||
SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type);
|
SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type);
|
||||||
taosArrayPush(pVals, &cv);
|
taosArrayPush(pVals, &cv);
|
||||||
} else {
|
} else {
|
||||||
SColumnInfoData* pColData = taosArrayGet(pDataBlock->pDataBlock, dataIndex);
|
SColumnInfoData* pColData = taosArrayGet(pDataBlock->pDataBlock, dataIndex);
|
||||||
if (colDataIsNull_s(pColData, j)) {
|
if (colDataIsNull_s(pColData, j)) {
|
||||||
|
if (pCol->flags & COL_IS_KEY) {
|
||||||
|
qError("ts:%" PRId64 "Primary key column should not be null, colId:%" PRIi16 ", colType:%" PRIi8,
|
||||||
|
ts, pCol->colId, pCol->type);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type);
|
SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type);
|
||||||
taosArrayPush(pVals, &cv);
|
taosArrayPush(pVals, &cv);
|
||||||
dataIndex++;
|
dataIndex++;
|
||||||
|
@ -607,6 +622,8 @@ int32_t doConvertRows(SSubmitTbData* pTableData, const STSchema* pTSchema, SSDat
|
||||||
}
|
}
|
||||||
|
|
||||||
SRow* pRow = NULL;
|
SRow* pRow = NULL;
|
||||||
|
tqInfo("result column flag:%d", pTSchema->columns[1].flags);
|
||||||
|
|
||||||
code = tRowBuild(pVals, (STSchema*)pTSchema, &pRow);
|
code = tRowBuild(pVals, (STSchema*)pTSchema, &pRow);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
tDestroySubmitTbData(pTableData, TSDB_MSG_FLG_ENCODE);
|
tDestroySubmitTbData(pTableData, TSDB_MSG_FLG_ENCODE);
|
||||||
|
|
Loading…
Reference in New Issue