Merge pull request #27149 from taosdata/fix/3_liaohj

fix(stream): update return value check.
This commit is contained in:
Haojun Liao 2024-08-12 13:10:00 +08:00 committed by GitHub
commit 354c5d2823
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 31 additions and 16 deletions

View File

@ -126,7 +126,7 @@ static void vmProcessStreamQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
dGTrace("vgId:%d, msg:%p get from vnode-stream queue", pVnode->vgId, pMsg); dGTrace("vgId:%d, msg:%p get from vnode-stream queue", pVnode->vgId, pMsg);
int32_t code = vnodeProcessStreamMsg(pVnode->pImpl, pMsg, pInfo); int32_t code = vnodeProcessStreamMsg(pVnode->pImpl, pMsg, pInfo);
if (code != 0) { if (code != 0) {
if (terrno != 0) code = terrno; terrno = code;
dGError("vgId:%d, msg:%p failed to process stream msg %s since %s", pVnode->vgId, pMsg, TMSG_INFO(pMsg->msgType), dGError("vgId:%d, msg:%p failed to process stream msg %s since %s", pVnode->vgId, pMsg, TMSG_INFO(pMsg->msgType),
tstrerror(code)); tstrerror(code));
vmSendRsp(pMsg, code); vmSendRsp(pMsg, code);

View File

@ -419,13 +419,8 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj,
int32_t nullIndex = 0; int32_t nullIndex = 0;
int32_t dataIndex = 0; int32_t dataIndex = 0;
for (int16_t i = 0; i < pObj->outputSchema.nCols; i++) { for (int32_t i = 0; i < pObj->outputSchema.nCols; i++) {
SColLocation *pos = taosArrayGet(pCreate->fillNullCols, nullIndex); if (nullIndex >= numOfNULL) {
if (pos == NULL) {
continue;
}
if (nullIndex >= numOfNULL || i < pos->slotId) {
pFullSchema[i].bytes = pObj->outputSchema.pSchema[dataIndex].bytes; pFullSchema[i].bytes = pObj->outputSchema.pSchema[dataIndex].bytes;
pFullSchema[i].colId = i + 1; // pObj->outputSchema.pSchema[dataIndex].colId; pFullSchema[i].colId = i + 1; // pObj->outputSchema.pSchema[dataIndex].colId;
pFullSchema[i].flags = pObj->outputSchema.pSchema[dataIndex].flags; pFullSchema[i].flags = pObj->outputSchema.pSchema[dataIndex].flags;
@ -433,14 +428,34 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj,
pFullSchema[i].type = pObj->outputSchema.pSchema[dataIndex].type; pFullSchema[i].type = pObj->outputSchema.pSchema[dataIndex].type;
dataIndex++; dataIndex++;
} else { } else {
pFullSchema[i].bytes = 0; SColLocation *pos = NULL;
pFullSchema[i].colId = pos->colId; if (nullIndex < taosArrayGetSize(pCreate->fillNullCols)) {
pFullSchema[i].flags = COL_SET_NULL; pos = taosArrayGet(pCreate->fillNullCols, nullIndex);
memset(pFullSchema[i].name, 0, TSDB_COL_NAME_LEN); }
pFullSchema[i].type = pos->type;
nullIndex++; if (pos == NULL) {
mError("invalid null column index, %d", nullIndex);
continue;
}
if (i < pos->slotId) {
pFullSchema[i].bytes = pObj->outputSchema.pSchema[dataIndex].bytes;
pFullSchema[i].colId = i + 1; // pObj->outputSchema.pSchema[dataIndex].colId;
pFullSchema[i].flags = pObj->outputSchema.pSchema[dataIndex].flags;
strcpy(pFullSchema[i].name, pObj->outputSchema.pSchema[dataIndex].name);
pFullSchema[i].type = pObj->outputSchema.pSchema[dataIndex].type;
dataIndex++;
} else {
pFullSchema[i].bytes = 0;
pFullSchema[i].colId = pos->colId;
pFullSchema[i].flags = COL_SET_NULL;
memset(pFullSchema[i].name, 0, TSDB_COL_NAME_LEN);
pFullSchema[i].type = pos->type;
nullIndex++;
}
} }
} }
taosMemoryFree(pObj->outputSchema.pSchema); taosMemoryFree(pObj->outputSchema.pSchema);
pObj->outputSchema.pSchema = pFullSchema; pObj->outputSchema.pSchema = pFullSchema;
} }

View File

@ -1244,8 +1244,8 @@ int32_t tqStreamTaskProcessConsenChkptIdReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
// discard the rsp, since it is expired. // discard the rsp, since it is expired.
if (req.startTs < pTask->execInfo.created) { if (req.startTs < pTask->execInfo.created) {
tqWarn("s-task:%s vgId:%d create time:%" PRId64 " recv expired consensus checkpointId:%" PRId64 tqWarn("s-task:%s vgId:%d create time:%" PRId64 " recv expired consensus checkpointId:%" PRId64
" from task createTs:%" PRId64 ", discard", " from task createTs:%" PRId64 " < task createTs:%" PRId64 ", discard",
pTask->id.idStr, pMeta->vgId, pTask->execInfo.created, req.checkpointId, req.startTs); pTask->id.idStr, pMeta->vgId, pTask->execInfo.created, req.checkpointId, req.startTs, pTask->execInfo.created);
streamMetaAddFailedTaskSelf(pTask, now); streamMetaAddFailedTaskSelf(pTask, now);
streamMetaReleaseTask(pMeta, pTask); streamMetaReleaseTask(pMeta, pTask);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;