Merge branch '3.0' of https://github.com/taosdata/TDengine into fix/TD-31372
This commit is contained in:
commit
2c281fdb37
|
@ -126,7 +126,7 @@ static void vmProcessStreamQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
|
||||||
dGTrace("vgId:%d, msg:%p get from vnode-stream queue", pVnode->vgId, pMsg);
|
dGTrace("vgId:%d, msg:%p get from vnode-stream queue", pVnode->vgId, pMsg);
|
||||||
int32_t code = vnodeProcessStreamMsg(pVnode->pImpl, pMsg, pInfo);
|
int32_t code = vnodeProcessStreamMsg(pVnode->pImpl, pMsg, pInfo);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
if (terrno != 0) code = terrno;
|
terrno = code;
|
||||||
dGError("vgId:%d, msg:%p failed to process stream msg %s since %s", pVnode->vgId, pMsg, TMSG_INFO(pMsg->msgType),
|
dGError("vgId:%d, msg:%p failed to process stream msg %s since %s", pVnode->vgId, pMsg, TMSG_INFO(pMsg->msgType),
|
||||||
tstrerror(code));
|
tstrerror(code));
|
||||||
vmSendRsp(pMsg, code);
|
vmSendRsp(pMsg, code);
|
||||||
|
|
|
@ -387,7 +387,7 @@ int32_t dmInitClient(SDnode *pDnode) {
|
||||||
rpcInit.supportBatch = 1;
|
rpcInit.supportBatch = 1;
|
||||||
rpcInit.batchSize = 8 * 1024;
|
rpcInit.batchSize = 8 * 1024;
|
||||||
rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
|
rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
|
||||||
rpcInit.notWaitAvaliableConn = 1;
|
rpcInit.notWaitAvaliableConn = 0;
|
||||||
|
|
||||||
(void)taosVersionStrToInt(version, &(rpcInit.compatibilityVer));
|
(void)taosVersionStrToInt(version, &(rpcInit.compatibilityVer));
|
||||||
|
|
||||||
|
|
|
@ -419,13 +419,26 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj,
|
||||||
|
|
||||||
int32_t nullIndex = 0;
|
int32_t nullIndex = 0;
|
||||||
int32_t dataIndex = 0;
|
int32_t dataIndex = 0;
|
||||||
for (int16_t i = 0; i < pObj->outputSchema.nCols; i++) {
|
for (int32_t i = 0; i < pObj->outputSchema.nCols; i++) {
|
||||||
SColLocation *pos = taosArrayGet(pCreate->fillNullCols, nullIndex);
|
if (nullIndex >= numOfNULL) {
|
||||||
|
pFullSchema[i].bytes = pObj->outputSchema.pSchema[dataIndex].bytes;
|
||||||
|
pFullSchema[i].colId = i + 1; // pObj->outputSchema.pSchema[dataIndex].colId;
|
||||||
|
pFullSchema[i].flags = pObj->outputSchema.pSchema[dataIndex].flags;
|
||||||
|
strcpy(pFullSchema[i].name, pObj->outputSchema.pSchema[dataIndex].name);
|
||||||
|
pFullSchema[i].type = pObj->outputSchema.pSchema[dataIndex].type;
|
||||||
|
dataIndex++;
|
||||||
|
} else {
|
||||||
|
SColLocation *pos = NULL;
|
||||||
|
if (nullIndex < taosArrayGetSize(pCreate->fillNullCols)) {
|
||||||
|
pos = taosArrayGet(pCreate->fillNullCols, nullIndex);
|
||||||
|
}
|
||||||
|
|
||||||
if (pos == NULL) {
|
if (pos == NULL) {
|
||||||
|
mError("invalid null column index, %d", nullIndex);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (nullIndex >= numOfNULL || i < pos->slotId) {
|
if (i < pos->slotId) {
|
||||||
pFullSchema[i].bytes = pObj->outputSchema.pSchema[dataIndex].bytes;
|
pFullSchema[i].bytes = pObj->outputSchema.pSchema[dataIndex].bytes;
|
||||||
pFullSchema[i].colId = i + 1; // pObj->outputSchema.pSchema[dataIndex].colId;
|
pFullSchema[i].colId = i + 1; // pObj->outputSchema.pSchema[dataIndex].colId;
|
||||||
pFullSchema[i].flags = pObj->outputSchema.pSchema[dataIndex].flags;
|
pFullSchema[i].flags = pObj->outputSchema.pSchema[dataIndex].flags;
|
||||||
|
@ -441,6 +454,8 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj,
|
||||||
nullIndex++;
|
nullIndex++;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
taosMemoryFree(pObj->outputSchema.pSchema);
|
taosMemoryFree(pObj->outputSchema.pSchema);
|
||||||
pObj->outputSchema.pSchema = pFullSchema;
|
pObj->outputSchema.pSchema = pFullSchema;
|
||||||
}
|
}
|
||||||
|
|
|
@ -1244,8 +1244,8 @@ int32_t tqStreamTaskProcessConsenChkptIdReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
|
||||||
// discard the rsp, since it is expired.
|
// discard the rsp, since it is expired.
|
||||||
if (req.startTs < pTask->execInfo.created) {
|
if (req.startTs < pTask->execInfo.created) {
|
||||||
tqWarn("s-task:%s vgId:%d create time:%" PRId64 " recv expired consensus checkpointId:%" PRId64
|
tqWarn("s-task:%s vgId:%d create time:%" PRId64 " recv expired consensus checkpointId:%" PRId64
|
||||||
" from task createTs:%" PRId64 ", discard",
|
" from task createTs:%" PRId64 " < task createTs:%" PRId64 ", discard",
|
||||||
pTask->id.idStr, pMeta->vgId, pTask->execInfo.created, req.checkpointId, req.startTs);
|
pTask->id.idStr, pMeta->vgId, pTask->execInfo.created, req.checkpointId, req.startTs, pTask->execInfo.created);
|
||||||
streamMetaAddFailedTaskSelf(pTask, now);
|
streamMetaAddFailedTaskSelf(pTask, now);
|
||||||
streamMetaReleaseTask(pMeta, pTask);
|
streamMetaReleaseTask(pMeta, pTask);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
|
|
@ -384,6 +384,9 @@ static LRUStatus taosLRUCacheShardInsertEntry(SLRUCacheShard *shard, SLRUEntry *
|
||||||
bool freeOnFail) {
|
bool freeOnFail) {
|
||||||
LRUStatus status = TAOS_LRU_STATUS_OK;
|
LRUStatus status = TAOS_LRU_STATUS_OK;
|
||||||
SArray *lastReferenceList = taosArrayInit(16, POINTER_BYTES);
|
SArray *lastReferenceList = taosArrayInit(16, POINTER_BYTES);
|
||||||
|
if (!lastReferenceList) {
|
||||||
|
return TAOS_LRU_STATUS_FAIL;
|
||||||
|
}
|
||||||
|
|
||||||
(void)taosThreadMutexLock(&shard->mutex);
|
(void)taosThreadMutexLock(&shard->mutex);
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue