fix(stream): update return value check.
This commit is contained in:
parent
29a6b072a8
commit
5ff6397416
|
@ -419,13 +419,8 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj,
|
|||
|
||||
int32_t nullIndex = 0;
|
||||
int32_t dataIndex = 0;
|
||||
for (int16_t i = 0; i < pObj->outputSchema.nCols; i++) {
|
||||
SColLocation *pos = taosArrayGet(pCreate->fillNullCols, nullIndex);
|
||||
if (pos == NULL) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (nullIndex >= numOfNULL || i < pos->slotId) {
|
||||
for (int32_t i = 0; i < pObj->outputSchema.nCols; i++) {
|
||||
if (nullIndex >= numOfNULL) {
|
||||
pFullSchema[i].bytes = pObj->outputSchema.pSchema[dataIndex].bytes;
|
||||
pFullSchema[i].colId = i + 1; // pObj->outputSchema.pSchema[dataIndex].colId;
|
||||
pFullSchema[i].flags = pObj->outputSchema.pSchema[dataIndex].flags;
|
||||
|
@ -433,14 +428,34 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj,
|
|||
pFullSchema[i].type = pObj->outputSchema.pSchema[dataIndex].type;
|
||||
dataIndex++;
|
||||
} else {
|
||||
pFullSchema[i].bytes = 0;
|
||||
pFullSchema[i].colId = pos->colId;
|
||||
pFullSchema[i].flags = COL_SET_NULL;
|
||||
memset(pFullSchema[i].name, 0, TSDB_COL_NAME_LEN);
|
||||
pFullSchema[i].type = pos->type;
|
||||
nullIndex++;
|
||||
SColLocation *pos = NULL;
|
||||
if (nullIndex < taosArrayGetSize(pCreate->fillNullCols)) {
|
||||
pos = taosArrayGet(pCreate->fillNullCols, nullIndex);
|
||||
}
|
||||
|
||||
if (pos == NULL) {
|
||||
mError("invalid null column index, %d", nullIndex);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (i < pos->slotId) {
|
||||
pFullSchema[i].bytes = pObj->outputSchema.pSchema[dataIndex].bytes;
|
||||
pFullSchema[i].colId = i + 1; // pObj->outputSchema.pSchema[dataIndex].colId;
|
||||
pFullSchema[i].flags = pObj->outputSchema.pSchema[dataIndex].flags;
|
||||
strcpy(pFullSchema[i].name, pObj->outputSchema.pSchema[dataIndex].name);
|
||||
pFullSchema[i].type = pObj->outputSchema.pSchema[dataIndex].type;
|
||||
dataIndex++;
|
||||
} else {
|
||||
pFullSchema[i].bytes = 0;
|
||||
pFullSchema[i].colId = pos->colId;
|
||||
pFullSchema[i].flags = COL_SET_NULL;
|
||||
memset(pFullSchema[i].name, 0, TSDB_COL_NAME_LEN);
|
||||
pFullSchema[i].type = pos->type;
|
||||
nullIndex++;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
taosMemoryFree(pObj->outputSchema.pSchema);
|
||||
pObj->outputSchema.pSchema = pFullSchema;
|
||||
}
|
||||
|
|
|
@ -1244,8 +1244,8 @@ int32_t tqStreamTaskProcessConsenChkptIdReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
|
|||
// discard the rsp, since it is expired.
|
||||
if (req.startTs < pTask->execInfo.created) {
|
||||
tqWarn("s-task:%s vgId:%d create time:%" PRId64 " recv expired consensus checkpointId:%" PRId64
|
||||
" from task createTs:%" PRId64 ", discard",
|
||||
pTask->id.idStr, pMeta->vgId, pTask->execInfo.created, req.checkpointId, req.startTs);
|
||||
" from task createTs:%" PRId64 " < task createTs:%" PRId64 ", discard",
|
||||
pTask->id.idStr, pMeta->vgId, pTask->execInfo.created, req.checkpointId, req.startTs, pTask->execInfo.created);
|
||||
streamMetaAddFailedTaskSelf(pTask, now);
|
||||
streamMetaReleaseTask(pMeta, pTask);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
|
Loading…
Reference in New Issue