fix(stream): check return value.

This commit is contained in:
Haojun Liao 2024-08-05 14:53:33 +08:00
parent ade444b690
commit a65594bb24
3 changed files with 45 additions and 3 deletions

View File

@ -305,8 +305,12 @@ int32_t tqCheckColModifiable(STQ* pTq, int64_t tbUid, int32_t colId) {
if (pCheck->ntbUid == tbUid) { if (pCheck->ntbUid == tbUid) {
int32_t sz = taosArrayGetSize(pCheck->colIdList); int32_t sz = taosArrayGetSize(pCheck->colIdList);
for (int32_t i = 0; i < sz; i++) { for (int32_t i = 0; i < sz; i++) {
int16_t forbidColId = *(int16_t*)taosArrayGet(pCheck->colIdList, i); int16_t* pForbidColId = taosArrayGet(pCheck->colIdList, i);
if (forbidColId == colId) { if (pForbidColId == NULL) {
continue;
}
if ((*pForbidColId) == colId) {
taosHashCancelIterate(pTq->pCheckInfo, pIter); taosHashCancelIterate(pTq->pCheckInfo, pIter);
return -1; return -1;
} }

View File

@ -54,6 +54,10 @@ int32_t tqBuildDeleteReq(STQ* pTq, const char* stbFullName, const SSDataBlock* p
SColumnInfoData* pGidCol = taosArrayGet(pDataBlock->pDataBlock, GROUPID_COLUMN_INDEX); SColumnInfoData* pGidCol = taosArrayGet(pDataBlock->pDataBlock, GROUPID_COLUMN_INDEX);
SColumnInfoData* pTbNameCol = taosArrayGet(pDataBlock->pDataBlock, TABLE_NAME_COLUMN_INDEX); SColumnInfoData* pTbNameCol = taosArrayGet(pDataBlock->pDataBlock, TABLE_NAME_COLUMN_INDEX);
if (pStartTsCol == NULL || pEndTsCol == NULL || pGidCol == NULL || pTbNameCol == NULL) {
return terrno;
}
tqDebug("s-task:%s build %d rows delete msg for table:%s", pIdStr, totalRows, stbFullName); tqDebug("s-task:%s build %d rows delete msg for table:%s", pIdStr, totalRows, stbFullName);
for (int32_t row = 0; row < totalRows; row++) { for (int32_t row = 0; row < totalRows; row++) {
@ -297,6 +301,9 @@ static int32_t doBuildAndSendCreateTableMsg(SVnode* pVnode, char* stbFullName, S
} else { } else {
for (int32_t tagId = UD_TAG_COLUMN_INDEX, step = 1; tagId < size; tagId++, step++) { for (int32_t tagId = UD_TAG_COLUMN_INDEX, step = 1; tagId < size; tagId++, step++) {
SColumnInfoData* pTagData = taosArrayGet(pDataBlock->pDataBlock, tagId); SColumnInfoData* pTagData = taosArrayGet(pDataBlock->pDataBlock, tagId);
if (pTagData == NULL) {
continue;
}
STagVal tagVal = {.cid = pTSchema->numOfCols + step, .type = pTagData->info.type}; STagVal tagVal = {.cid = pTSchema->numOfCols + step, .type = pTagData->info.type};
void* pData = colDataGetData(pTagData, rowId); void* pData = colDataGetData(pTagData, rowId);
@ -329,6 +336,9 @@ static int32_t doBuildAndSendCreateTableMsg(SVnode* pVnode, char* stbFullName, S
uint64_t gid = pDataBlock->info.id.groupId; uint64_t gid = pDataBlock->info.id.groupId;
if (taosArrayGetSize(pDataBlock->pDataBlock) > UD_GROUPID_COLUMN_INDEX) { if (taosArrayGetSize(pDataBlock->pDataBlock) > UD_GROUPID_COLUMN_INDEX) {
SColumnInfoData* pGpIdColInfo = taosArrayGet(pDataBlock->pDataBlock, UD_GROUPID_COLUMN_INDEX); SColumnInfoData* pGpIdColInfo = taosArrayGet(pDataBlock->pDataBlock, UD_GROUPID_COLUMN_INDEX);
if (pGpIdColInfo == NULL) {
continue;
}
// todo remove this // todo remove this
void* pGpIdData = colDataGetData(pGpIdColInfo, rowId); void* pGpIdData = colDataGetData(pGpIdColInfo, rowId);
@ -656,6 +666,10 @@ int32_t doConvertRows(SSubmitTbData* pTableData, const STSchema* pTSchema, SSDat
// primary timestamp column, for debug purpose // primary timestamp column, for debug purpose
if (k == 0) { if (k == 0) {
SColumnInfoData* pColData = taosArrayGet(pDataBlock->pDataBlock, dataIndex); SColumnInfoData* pColData = taosArrayGet(pDataBlock->pDataBlock, dataIndex);
if (pColData == NULL) {
continue;
}
ts = *(int64_t*)colDataGetData(pColData, j); ts = *(int64_t*)colDataGetData(pColData, j);
tqTrace("s-task:%s sink row %d, col %d ts %" PRId64, id, j, k, ts); tqTrace("s-task:%s sink row %d, col %d ts %" PRId64, id, j, k, ts);
@ -682,6 +696,10 @@ int32_t doConvertRows(SSubmitTbData* pTableData, const STSchema* pTSchema, SSDat
} }
} else { } else {
SColumnInfoData* pColData = taosArrayGet(pDataBlock->pDataBlock, dataIndex); SColumnInfoData* pColData = taosArrayGet(pDataBlock->pDataBlock, dataIndex);
if (pColData == NULL) {
continue;
}
if (colDataIsNull_s(pColData, j)) { if (colDataIsNull_s(pColData, j)) {
if (pCol->flags & COL_IS_KEY) { if (pCol->flags & COL_IS_KEY) {
qError("ts:%" PRId64 " primary key column should not be null, colId:%" PRIi16 ", colType:%" PRIi8, qError("ts:%" PRId64 " primary key column should not be null, colId:%" PRIi16 ", colType:%" PRIi8,
@ -993,6 +1011,10 @@ void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) {
} }
SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i); SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i);
if (pDataBlock == NULL) {
continue;
}
if (pDataBlock->info.type == STREAM_DELETE_RESULT) { if (pDataBlock->info.type == STREAM_DELETE_RESULT) {
code = doBuildAndSendDeleteMsg(pVnode, stbFullName, pDataBlock, pTask, suid); code = doBuildAndSendDeleteMsg(pVnode, stbFullName, pDataBlock, pTask, suid);
} else if (pDataBlock->info.type == STREAM_CREATE_CHILD_TABLE) { } else if (pDataBlock->info.type == STREAM_CREATE_CHILD_TABLE) {
@ -1059,6 +1081,10 @@ void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) {
} }
SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i); SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i);
if (pDataBlock == NULL) {
continue;
}
if (pDataBlock->info.type == STREAM_CHECKPOINT) { if (pDataBlock->info.type == STREAM_CHECKPOINT) {
continue; continue;
} }
@ -1110,6 +1136,10 @@ void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) {
} }
SSubmitTbData* pExisted = taosArrayGet(submitReq.aSubmitTbData, *index); SSubmitTbData* pExisted = taosArrayGet(submitReq.aSubmitTbData, *index);
if (pExisted == NULL) {
continue;
}
code = doMergeExistedRows(pExisted, &tbData, id); code = doMergeExistedRows(pExisted, &tbData, id);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
continue; continue;
@ -1137,6 +1167,10 @@ void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) {
bool hasOnlySubmitData(const SArray* pBlocks, int32_t numOfBlocks) { bool hasOnlySubmitData(const SArray* pBlocks, int32_t numOfBlocks) {
for (int32_t i = 0; i < numOfBlocks; ++i) { for (int32_t i = 0; i < numOfBlocks; ++i) {
SSDataBlock* p = taosArrayGet(pBlocks, i); SSDataBlock* p = taosArrayGet(pBlocks, i);
if (p == NULL) {
continue;
}
if (p->info.type == STREAM_DELETE_RESULT || p->info.type == STREAM_CREATE_CHILD_TABLE) { if (p->info.type == STREAM_DELETE_RESULT || p->info.type == STREAM_CREATE_CHILD_TABLE) {
return false; return false;
} }

View File

@ -357,9 +357,13 @@ int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle) {
for (int32_t i = 0; i < numOfTasks; ++i) { for (int32_t i = 0; i < numOfTasks; ++i) {
STaskId* pTaskId = taosArrayGet(pTaskList, i); STaskId* pTaskId = taosArrayGet(pTaskList, i);
if (pTaskId == NULL) {
continue;
}
SStreamTask* pTask = NULL; SStreamTask* pTask = NULL;
int32_t code = streamMetaAcquireTask(pStreamMeta, pTaskId->streamId, pTaskId->taskId, &pTask); int32_t code = streamMetaAcquireTask(pStreamMeta, pTaskId->streamId, pTaskId->taskId, &pTask);
if (pTask == NULL) { if (pTask == NULL || code != 0) {
continue; continue;
} }