tsdb/suspend: fix table block scan info
This commit is contained in:
parent
020b79a094
commit
5024fc1e9f
|
@ -4001,7 +4001,8 @@ int32_t tsdbReaderSuspend(STsdbReader* pReader) {
|
||||||
if (pStatus->loadFromFile) {
|
if (pStatus->loadFromFile) {
|
||||||
SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter);
|
SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter);
|
||||||
if (pBlockInfo != NULL) {
|
if (pBlockInfo != NULL) {
|
||||||
pBlockScanInfo = taosHashGet(pStatus->pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
|
pBlockScanInfo =
|
||||||
|
*(STableBlockScanInfo**)taosHashGet(pStatus->pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
|
||||||
if (pBlockScanInfo == NULL) {
|
if (pBlockScanInfo == NULL) {
|
||||||
code = TSDB_CODE_INVALID_PARA;
|
code = TSDB_CODE_INVALID_PARA;
|
||||||
tsdbError("failed to locate the uid:%" PRIu64 " in query table uid list, total tables:%d, %s", pBlockInfo->uid,
|
tsdbError("failed to locate the uid:%" PRIu64 " in query table uid list, total tables:%d, %s", pBlockInfo->uid,
|
||||||
|
@ -4015,20 +4016,28 @@ int32_t tsdbReaderSuspend(STsdbReader* pReader) {
|
||||||
tsdbDataFReaderClose(&pReader->pFileReader);
|
tsdbDataFReaderClose(&pReader->pFileReader);
|
||||||
|
|
||||||
// resetDataBlockScanInfo excluding lastKey
|
// resetDataBlockScanInfo excluding lastKey
|
||||||
STableBlockScanInfo* p = NULL;
|
STableBlockScanInfo** p = NULL;
|
||||||
|
|
||||||
while ((p = taosHashIterate(pStatus->pTableMap, p)) != NULL) {
|
while ((p = taosHashIterate(pStatus->pTableMap, p)) != NULL) {
|
||||||
p->iterInit = false;
|
STableBlockScanInfo* pInfo = *(STableBlockScanInfo**)p;
|
||||||
p->iiter.hasVal = false;
|
|
||||||
if (p->iter.iter != NULL) {
|
pInfo->iterInit = false;
|
||||||
p->iter.iter = tsdbTbDataIterDestroy(p->iter.iter);
|
pInfo->iter.hasVal = false;
|
||||||
|
pInfo->iiter.hasVal = false;
|
||||||
|
|
||||||
|
if (pInfo->iter.iter != NULL) {
|
||||||
|
pInfo->iter.iter = tsdbTbDataIterDestroy(pInfo->iter.iter);
|
||||||
}
|
}
|
||||||
|
|
||||||
p->delSkyline = taosArrayDestroy(p->delSkyline);
|
if (pInfo->iiter.iter != NULL) {
|
||||||
// p->lastKey = ts;
|
pInfo->iiter.iter = tsdbTbDataIterDestroy(pInfo->iiter.iter);
|
||||||
|
}
|
||||||
|
|
||||||
|
pInfo->delSkyline = taosArrayDestroy(pInfo->delSkyline);
|
||||||
|
// pInfo->lastKey = ts;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
pBlockScanInfo = *pStatus->pTableIter;
|
pBlockScanInfo = pStatus->pTableIter == NULL ? NULL : *pStatus->pTableIter;
|
||||||
if (pBlockScanInfo) {
|
if (pBlockScanInfo) {
|
||||||
// save lastKey to restore memory iterator
|
// save lastKey to restore memory iterator
|
||||||
STimeWindow w = pReader->pResBlock->info.window;
|
STimeWindow w = pReader->pResBlock->info.window;
|
||||||
|
@ -4053,10 +4062,12 @@ int32_t tsdbReaderSuspend(STsdbReader* pReader) {
|
||||||
}
|
}
|
||||||
|
|
||||||
tsdbUntakeReadSnap(pReader, pReader->pReadSnap, false);
|
tsdbUntakeReadSnap(pReader, pReader->pReadSnap, false);
|
||||||
|
pReader->pReadSnap = NULL;
|
||||||
|
|
||||||
pReader->suspended = true;
|
pReader->suspended = true;
|
||||||
|
|
||||||
tsdbDebug("reader: %p suspended uid %" PRIu64 " in this query %s", pReader, pBlockScanInfo->uid, pReader->idStr);
|
tsdbDebug("reader: %p suspended uid %" PRIu64 " in this query %s", pReader, pBlockScanInfo ? pBlockScanInfo->uid : 0,
|
||||||
|
pReader->idStr);
|
||||||
return code;
|
return code;
|
||||||
|
|
||||||
_err:
|
_err:
|
||||||
|
|
|
@ -191,13 +191,13 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp
|
||||||
void *pReq;
|
void *pReq;
|
||||||
int32_t len;
|
int32_t len;
|
||||||
int32_t ret;
|
int32_t ret;
|
||||||
|
/*
|
||||||
if (!pVnode->inUse) {
|
if (!pVnode->inUse) {
|
||||||
terrno = TSDB_CODE_VND_NO_AVAIL_BUFPOOL;
|
terrno = TSDB_CODE_VND_NO_AVAIL_BUFPOOL;
|
||||||
vError("vgId:%d, not ready to write since %s", TD_VID(pVnode), terrstr());
|
vError("vgId:%d, not ready to write since %s", TD_VID(pVnode), terrstr());
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
*/
|
||||||
if (version <= pVnode->state.applied) {
|
if (version <= pVnode->state.applied) {
|
||||||
vError("vgId:%d, duplicate write request. version: %" PRId64 ", applied: %" PRId64 "", TD_VID(pVnode), version,
|
vError("vgId:%d, duplicate write request. version: %" PRId64 ", applied: %" PRId64 "", TD_VID(pVnode), version,
|
||||||
pVnode->state.applied);
|
pVnode->state.applied);
|
||||||
|
@ -1001,7 +1001,7 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq
|
||||||
code = terrno;
|
code = terrno;
|
||||||
goto _exit;
|
goto _exit;
|
||||||
}
|
}
|
||||||
pSubmitTbData->uid = pSubmitTbData->pCreateTbReq->uid; // update uid if table exist for using below
|
pSubmitTbData->uid = pSubmitTbData->pCreateTbReq->uid; // update uid if table exist for using below
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue