fix:optimize log & change the length of tag if tag is null in schemaless
This commit is contained in:
parent
9493ecd18f
commit
c40c695ea7
|
@ -699,7 +699,7 @@ static int32_t smlSendMetaMsg(SSmlHandle *info, SName *pName, SArray *pColumns,
|
|||
pReq.numOfTags = 1;
|
||||
SField field = {0};
|
||||
field.type = TSDB_DATA_TYPE_NCHAR;
|
||||
field.bytes = 1;
|
||||
field.bytes = TSDB_NCHAR_SIZE + VARSTR_HEADER_SIZE;
|
||||
strcpy(field.name, tsSmlTagName);
|
||||
taosArrayPush(pReq.pTags, &field);
|
||||
}
|
||||
|
|
|
@ -114,11 +114,11 @@ static void vmProcessFetchQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO
|
|||
int32_t code = vnodeProcessFetchMsg(pVnode->pImpl, pMsg, pInfo);
|
||||
if (code != 0) {
|
||||
if (terrno != 0) code = terrno;
|
||||
dGError("vgId:%d, msg:%p failed to fetch since %s", pVnode->vgId, pMsg, terrstr(code));
|
||||
dGError("vnodeProcessFetchMsg vgId:%d, msg:%p failed to fetch since %s", pVnode->vgId, pMsg, terrstr());
|
||||
vmSendRsp(pMsg, code);
|
||||
}
|
||||
|
||||
dGTrace("vgId:%d, msg:%p is freed, code:0x%x", pVnode->vgId, pMsg, code);
|
||||
dGTrace("vnodeProcessFetchMsg vgId:%d, msg:%p is freed, code:0x%x", pVnode->vgId, pMsg, code);
|
||||
rpcFreeCont(pMsg->pCont);
|
||||
taosFreeQitem(pMsg);
|
||||
}
|
||||
|
|
|
@ -296,10 +296,9 @@ void tqCloseReader(STqReader* pReader) {
|
|||
|
||||
int32_t tqSeekVer(STqReader* pReader, int64_t ver, const char* id) {
|
||||
if (walReadSeekVer(pReader->pWalReader, ver) < 0) {
|
||||
tqDebug("tmq poll: wal reader failed to seek to ver:%"PRId64" code:%s, %s", ver, tstrerror(terrno), id);
|
||||
return -1;
|
||||
}
|
||||
tqDebug("tmq poll: wal reader seek to ver:%"PRId64" %s", ver, id);
|
||||
tqDebug("tmq poll: wal reader seek to ver success ver:%"PRId64" %s", ver, id);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
|
|
@ -1062,6 +1062,7 @@ int32_t qStreamSetScanMemData(qTaskInfo_t tinfo, SPackedData submit) {
|
|||
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
|
||||
if((pTaskInfo->execModel != OPTR_EXEC_MODEL_QUEUE) || (pTaskInfo->streamInfo.submit.msgStr != NULL)){
|
||||
qError("qStreamSetScanMemData err:%d,%p", pTaskInfo->execModel, pTaskInfo->streamInfo.submit.msgStr);
|
||||
terrno = TSDB_CODE_PAR_INTERNAL_ERROR;
|
||||
return -1;
|
||||
}
|
||||
qDebug("set the submit block for future scan");
|
||||
|
@ -1102,7 +1103,6 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
|
|||
|
||||
// let's seek to the next version in wal file
|
||||
if (tqSeekVer(pInfo->tqReader, pOffset->version + 1, id) < 0) {
|
||||
qError("tqSeekVer failed ver:%"PRId64", %s", pOffset->version + 1, id);
|
||||
return -1;
|
||||
}
|
||||
} else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
|
||||
|
@ -1125,6 +1125,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
|
|||
} else {
|
||||
taosRUnLockLatch(&pTaskInfo->lock);
|
||||
qError("no table in table list, %s", id);
|
||||
terrno = TSDB_CODE_PAR_INTERNAL_ERROR;
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
@ -1143,6 +1144,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
|
|||
} else {
|
||||
qError("vgId:%d uid:%" PRIu64 " not found in table list, total:%d, index:%d %s", pTaskInfo->id.vgId, uid,
|
||||
numOfTables, pScanInfo->currentTable, id);
|
||||
terrno = TSDB_CODE_PAR_INTERNAL_ERROR;
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
@ -1175,6 +1177,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
|
|||
pScanBaseInfo->cond.twindows.skey = oldSkey;
|
||||
} else {
|
||||
qError("invalid pOffset->type:%d, %s", pOffset->type, id);
|
||||
terrno = TSDB_CODE_PAR_INTERNAL_ERROR;
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
@ -1189,6 +1192,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
|
|||
|
||||
if (setForSnapShot(sContext, pOffset->uid) != 0) {
|
||||
qError("setDataForSnapShot error. uid:%" PRId64" , %s", pOffset->uid, id);
|
||||
terrno = TSDB_CODE_PAR_INTERNAL_ERROR;
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
@ -1224,6 +1228,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
|
|||
SSnapContext* sContext = pInfo->sContext;
|
||||
if (setForSnapShot(sContext, pOffset->uid) != 0) {
|
||||
qError("setForSnapShot error. uid:%" PRIu64 " ,version:%" PRId64, pOffset->uid, pOffset->version);
|
||||
terrno = TSDB_CODE_PAR_INTERNAL_ERROR;
|
||||
return -1;
|
||||
}
|
||||
qDebug("tmqsnap qStreamPrepareScan snapshot meta uid:%" PRId64 " ts %" PRId64 " %s", pOffset->uid, pOffset->ts, id);
|
||||
|
|
|
@ -999,6 +999,7 @@ int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scan
|
|||
SOperatorInfo* extractOperatorInTree(SOperatorInfo* pOperator, int32_t type, const char* id) {
|
||||
if (pOperator == NULL) {
|
||||
qError("invalid operator, failed to find tableScanOperator %s", id);
|
||||
terrno = TSDB_CODE_PAR_INTERNAL_ERROR;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
@ -1007,6 +1008,7 @@ SOperatorInfo* extractOperatorInTree(SOperatorInfo* pOperator, int32_t type, con
|
|||
} else {
|
||||
if (pOperator->pDownstream == NULL || pOperator->pDownstream[0] == NULL) {
|
||||
qError("invalid operator, failed to find tableScanOperator %s", id);
|
||||
terrno = TSDB_CODE_PAR_INTERNAL_ERROR;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
|
|
@ -207,17 +207,12 @@ int32_t walReadSeekVer(SWalReader *pReader, int64_t ver) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
// pReader->curInvalid = 1;
|
||||
// pReader->curVersion = ver;
|
||||
|
||||
if (ver > pWal->vers.lastVer || ver < pWal->vers.firstVer) {
|
||||
wDebug("vgId:%d, invalid index:%" PRId64 ", first index:%" PRId64 ", last index:%" PRId64, pReader->pWal->cfg.vgId,
|
||||
wInfo("vgId:%d, invalid index:%" PRId64 ", first index:%" PRId64 ", last index:%" PRId64, pReader->pWal->cfg.vgId,
|
||||
ver, pWal->vers.firstVer, pWal->vers.lastVer);
|
||||
terrno = TSDB_CODE_WAL_LOG_NOT_EXIST;
|
||||
return -1;
|
||||
}
|
||||
// if (ver < pWal->vers.snapshotVer) {
|
||||
// }
|
||||
|
||||
if (walReadSeekVerImpl(pReader, ver) < 0) {
|
||||
return -1;
|
||||
|
@ -236,8 +231,6 @@ static int32_t walFetchHeadNew(SWalReader *pRead, int64_t fetchVer) {
|
|||
|
||||
if (pRead->curVersion != fetchVer) {
|
||||
if (walReadSeekVer(pRead, fetchVer) < 0) {
|
||||
// pRead->curVersion = fetchVer;
|
||||
// pRead->curInvalid = 1;
|
||||
return -1;
|
||||
}
|
||||
seeked = true;
|
||||
|
@ -256,7 +249,6 @@ static int32_t walFetchHeadNew(SWalReader *pRead, int64_t fetchVer) {
|
|||
} else {
|
||||
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
||||
}
|
||||
// pRead->curInvalid = 1;
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue