diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 8a4571a4af..dce6b19d1f 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -305,8 +305,12 @@ int32_t tqCheckColModifiable(STQ* pTq, int64_t tbUid, int32_t colId) { if (pCheck->ntbUid == tbUid) { int32_t sz = taosArrayGetSize(pCheck->colIdList); for (int32_t i = 0; i < sz; i++) { - int16_t forbidColId = *(int16_t*)taosArrayGet(pCheck->colIdList, i); - if (forbidColId == colId) { + int16_t* pForbidColId = taosArrayGet(pCheck->colIdList, i); + if (pForbidColId == NULL) { + continue; + } + + if ((*pForbidColId) == colId) { taosHashCancelIterate(pTq->pCheckInfo, pIter); return -1; } diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index bd6db483f0..51d20be3e0 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -54,6 +54,10 @@ int32_t tqBuildDeleteReq(STQ* pTq, const char* stbFullName, const SSDataBlock* p SColumnInfoData* pGidCol = taosArrayGet(pDataBlock->pDataBlock, GROUPID_COLUMN_INDEX); SColumnInfoData* pTbNameCol = taosArrayGet(pDataBlock->pDataBlock, TABLE_NAME_COLUMN_INDEX); + if (pStartTsCol == NULL || pEndTsCol == NULL || pGidCol == NULL || pTbNameCol == NULL) { + return terrno; + } + tqDebug("s-task:%s build %d rows delete msg for table:%s", pIdStr, totalRows, stbFullName); for (int32_t row = 0; row < totalRows; row++) { @@ -297,6 +301,9 @@ static int32_t doBuildAndSendCreateTableMsg(SVnode* pVnode, char* stbFullName, S } else { for (int32_t tagId = UD_TAG_COLUMN_INDEX, step = 1; tagId < size; tagId++, step++) { SColumnInfoData* pTagData = taosArrayGet(pDataBlock->pDataBlock, tagId); + if (pTagData == NULL) { + continue; + } STagVal tagVal = {.cid = pTSchema->numOfCols + step, .type = pTagData->info.type}; void* pData = colDataGetData(pTagData, rowId); @@ -329,6 +336,9 @@ static int32_t doBuildAndSendCreateTableMsg(SVnode* pVnode, char* stbFullName, S uint64_t gid = pDataBlock->info.id.groupId; if (taosArrayGetSize(pDataBlock->pDataBlock) > UD_GROUPID_COLUMN_INDEX) { SColumnInfoData* pGpIdColInfo = taosArrayGet(pDataBlock->pDataBlock, UD_GROUPID_COLUMN_INDEX); + if (pGpIdColInfo == NULL) { + continue; + } // todo remove this void* pGpIdData = colDataGetData(pGpIdColInfo, rowId); @@ -656,6 +666,10 @@ int32_t doConvertRows(SSubmitTbData* pTableData, const STSchema* pTSchema, SSDat // primary timestamp column, for debug purpose if (k == 0) { SColumnInfoData* pColData = taosArrayGet(pDataBlock->pDataBlock, dataIndex); + if (pColData == NULL) { + continue; + } + ts = *(int64_t*)colDataGetData(pColData, j); tqTrace("s-task:%s sink row %d, col %d ts %" PRId64, id, j, k, ts); @@ -682,6 +696,10 @@ int32_t doConvertRows(SSubmitTbData* pTableData, const STSchema* pTSchema, SSDat } } else { SColumnInfoData* pColData = taosArrayGet(pDataBlock->pDataBlock, dataIndex); + if (pColData == NULL) { + continue; + } + if (colDataIsNull_s(pColData, j)) { if (pCol->flags & COL_IS_KEY) { qError("ts:%" PRId64 " primary key column should not be null, colId:%" PRIi16 ", colType:%" PRIi8, @@ -993,6 +1011,10 @@ void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) { } SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i); + if (pDataBlock == NULL) { + continue; + } + if (pDataBlock->info.type == STREAM_DELETE_RESULT) { code = doBuildAndSendDeleteMsg(pVnode, stbFullName, pDataBlock, pTask, suid); } else if (pDataBlock->info.type == STREAM_CREATE_CHILD_TABLE) { @@ -1059,6 +1081,10 @@ void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) { } SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i); + if (pDataBlock == NULL) { + continue; + } + if (pDataBlock->info.type == STREAM_CHECKPOINT) { continue; } @@ -1110,6 +1136,10 @@ void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) { } SSubmitTbData* pExisted = taosArrayGet(submitReq.aSubmitTbData, *index); + if (pExisted == NULL) { + continue; + } + code = doMergeExistedRows(pExisted, &tbData, id); if (code != TSDB_CODE_SUCCESS) { continue; @@ -1137,6 +1167,10 @@ void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) { bool hasOnlySubmitData(const SArray* pBlocks, int32_t numOfBlocks) { for (int32_t i = 0; i < numOfBlocks; ++i) { SSDataBlock* p = taosArrayGet(pBlocks, i); + if (p == NULL) { + continue; + } + if (p->info.type == STREAM_DELETE_RESULT || p->info.type == STREAM_CREATE_CHILD_TABLE) { return false; } diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index 3dc4beca57..b8ebb2254b 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -357,9 +357,13 @@ int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle) { for (int32_t i = 0; i < numOfTasks; ++i) { STaskId* pTaskId = taosArrayGet(pTaskList, i); + if (pTaskId == NULL) { + continue; + } + SStreamTask* pTask = NULL; int32_t code = streamMetaAcquireTask(pStreamMeta, pTaskId->streamId, pTaskId->taskId, &pTask); - if (pTask == NULL) { + if (pTask == NULL || code != 0) { continue; }