fix:[TD-30306]error in converity scan
This commit is contained in:
parent
d7ecd98cf9
commit
7182e0948e
|
@ -939,7 +939,7 @@ static int32_t smlSendMetaMsg(SSmlHandle *info, SName *pName, SArray *pColumns,
|
|||
pReq.pColumns = taosArrayInit(pReq.numOfColumns, sizeof(SFieldWithOptions));
|
||||
for (int32_t i = 0; i < pReq.numOfColumns; ++i) {
|
||||
SField *pField = taosArrayGet(pColumns, i);
|
||||
SFieldWithOptions fieldWithOption;
|
||||
SFieldWithOptions fieldWithOption = {0};
|
||||
setFieldWithOptions(&fieldWithOption, pField);
|
||||
setDefaultOptionsForField(&fieldWithOption);
|
||||
taosArrayPush(pReq.pColumns, &fieldWithOption);
|
||||
|
|
|
@ -233,7 +233,7 @@ int32_t smlParseTelnetString(SSmlHandle *info, char *sql, char *sqlEnd, SSmlLine
|
|||
|
||||
SSmlKv kvTs = {0};
|
||||
smlBuildTsKv(&kvTs, ts);
|
||||
if (needConverTime) {
|
||||
if (needConverTime && info->currSTableMeta != NULL) {
|
||||
kvTs.i = convertTimePrecision(kvTs.i, TSDB_TIME_PRECISION_NANO, info->currSTableMeta->tableInfo.precision);
|
||||
}
|
||||
|
||||
|
|
|
@ -1344,6 +1344,12 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
|
|||
goto FAIL;
|
||||
}
|
||||
|
||||
if(pMsg->pData == NULL){
|
||||
tscError("consumer:0x%" PRIx64 " msg discard from vgId:%d, since msg is NULL", tmq->consumerId, vgId);
|
||||
code = TSDB_CODE_TSC_INTERNAL_ERROR;
|
||||
goto FAIL;
|
||||
}
|
||||
|
||||
SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM, 0);
|
||||
if (pRspWrapper == NULL) {
|
||||
tscWarn("consumer:0x%" PRIx64 " msg discard from vgId:%d, since out of memory", tmq->consumerId, vgId);
|
||||
|
@ -1356,11 +1362,6 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
|
|||
goto END;
|
||||
}
|
||||
|
||||
if(pMsg->pData == NULL){
|
||||
tscError("consumer:0x%" PRIx64 " msg discard from vgId:%d, since msg is NULL", tmq->consumerId, vgId);
|
||||
goto FAIL;
|
||||
}
|
||||
|
||||
int32_t msgEpoch = ((SMqRspHead*)pMsg->pData)->epoch;
|
||||
int32_t clientEpoch = atomic_load_32(&tmq->epoch);
|
||||
if (msgEpoch < clientEpoch) {
|
||||
|
@ -2809,8 +2810,6 @@ int64_t getCommittedFromServer(tmq_t* tmq, char* tname, int32_t vgId, SEpSet* ep
|
|||
int64_t transporterId = 0;
|
||||
code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, epSet, &transporterId, sendInfo);
|
||||
if (code != 0) {
|
||||
taosMemoryFree(buf);
|
||||
taosMemoryFree(sendInfo);
|
||||
tsem_destroy(&pParam->sem);
|
||||
taosMemoryFree(pParam);
|
||||
return code;
|
||||
|
@ -3235,8 +3234,6 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_
|
|||
int64_t transporterId = 0;
|
||||
code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
|
||||
if (code != 0) {
|
||||
taosMemoryFree(msg);
|
||||
taosMemoryFree(sendInfo);
|
||||
tsem_destroy(&pParam->sem);
|
||||
taosMemoryFree(pParam);
|
||||
return code;
|
||||
|
|
Loading…
Reference in New Issue