Merge pull request #18060 from taosdata/fix/set_offset_for_empty_record
fix(query): set offset for empty record
This commit is contained in:
commit
b33d0e05f7
|
@ -418,6 +418,7 @@ static FORCE_INLINE int32_t taosEncodeSSchemaWrapper(void** buf, const SSchemaWr
|
||||||
static FORCE_INLINE void* taosDecodeSSchemaWrapper(const void* buf, SSchemaWrapper* pSW) {
|
static FORCE_INLINE void* taosDecodeSSchemaWrapper(const void* buf, SSchemaWrapper* pSW) {
|
||||||
buf = taosDecodeVariantI32(buf, &pSW->nCols);
|
buf = taosDecodeVariantI32(buf, &pSW->nCols);
|
||||||
buf = taosDecodeVariantI32(buf, &pSW->version);
|
buf = taosDecodeVariantI32(buf, &pSW->version);
|
||||||
|
if (pSW->nCols > 0) {
|
||||||
pSW->pSchema = (SSchema*)taosMemoryCalloc(pSW->nCols, sizeof(SSchema));
|
pSW->pSchema = (SSchema*)taosMemoryCalloc(pSW->nCols, sizeof(SSchema));
|
||||||
if (pSW->pSchema == NULL) {
|
if (pSW->pSchema == NULL) {
|
||||||
return NULL;
|
return NULL;
|
||||||
|
@ -426,6 +427,9 @@ static FORCE_INLINE void* taosDecodeSSchemaWrapper(const void* buf, SSchemaWrapp
|
||||||
for (int32_t i = 0; i < pSW->nCols; i++) {
|
for (int32_t i = 0; i < pSW->nCols; i++) {
|
||||||
buf = taosDecodeSSchema(buf, &pSW->pSchema[i]);
|
buf = taosDecodeSSchema(buf, &pSW->pSchema[i]);
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
pSW->pSchema = NULL;
|
||||||
|
}
|
||||||
return (void*)buf;
|
return (void*)buf;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2990,7 +2994,8 @@ static FORCE_INLINE void* tDecodeSMqSubTopicEp(void* buf, SMqSubTopicEp* pTopicE
|
||||||
}
|
}
|
||||||
|
|
||||||
static FORCE_INLINE void tDeleteSMqSubTopicEp(SMqSubTopicEp* pSubTopicEp) {
|
static FORCE_INLINE void tDeleteSMqSubTopicEp(SMqSubTopicEp* pSubTopicEp) {
|
||||||
if (pSubTopicEp->schema.nCols) taosMemoryFreeClear(pSubTopicEp->schema.pSchema);
|
taosMemoryFreeClear(pSubTopicEp->schema.pSchema);
|
||||||
|
pSubTopicEp->schema.nCols = 0;
|
||||||
taosArrayDestroy(pSubTopicEp->vgs);
|
taosArrayDestroy(pSubTopicEp->vgs);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1773,6 +1773,7 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
|
||||||
tqOffsetResetToLog(&pTaskInfo->streamInfo.prepareStatus, pTaskInfo->streamInfo.snapshotVer);
|
tqOffsetResetToLog(&pTaskInfo->streamInfo.prepareStatus, pTaskInfo->streamInfo.snapshotVer);
|
||||||
qDebug("queue scan tsdb over, switch to wal ver %" PRId64 "", pTaskInfo->streamInfo.snapshotVer + 1);
|
qDebug("queue scan tsdb over, switch to wal ver %" PRId64 "", pTaskInfo->streamInfo.snapshotVer + 1);
|
||||||
if (tqSeekVer(pInfo->tqReader, pTaskInfo->streamInfo.snapshotVer + 1) < 0) {
|
if (tqSeekVer(pInfo->tqReader, pTaskInfo->streamInfo.snapshotVer + 1) < 0) {
|
||||||
|
tqOffsetResetToLog(&pTaskInfo->streamInfo.lastStatus, pTaskInfo->streamInfo.snapshotVer);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
ASSERT(pInfo->tqReader->pWalReader->curVersion == pTaskInfo->streamInfo.snapshotVer + 1);
|
ASSERT(pInfo->tqReader->pWalReader->curVersion == pTaskInfo->streamInfo.snapshotVer + 1);
|
||||||
|
|
Loading…
Reference in New Issue