fix(query): fix race condition for table group list.
This commit is contained in:
parent
20cdb64cdd
commit
2bb9dc609a
|
@ -1241,6 +1241,29 @@ _end:
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t doInitReader(STableScanInfo* pInfo, SExecTaskInfo* pTaskInfo, SStorageAPI* pAPI, int32_t* pNum,
|
||||
STableKeyInfo** pList) {
|
||||
const char* idStr = GET_TASKID(pTaskInfo);
|
||||
int32_t code = initNextGroupScan(pInfo, pList, pNum);
|
||||
if (code) {
|
||||
qError("%s failed to init groupScan Info, code:%s at line:%d", idStr, tstrerror(code), __LINE__);
|
||||
return code;
|
||||
}
|
||||
|
||||
if (pInfo->base.dataReader != NULL) {
|
||||
qError("%s tsdb reader should be null", idStr);
|
||||
return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
|
||||
}
|
||||
|
||||
code = pAPI->tsdReader.tsdReaderOpen(pInfo->base.readHandle.vnode, &pInfo->base.cond, *pList, *pNum, pInfo->pResBlock,
|
||||
(void**)&pInfo->base.dataReader, idStr, &pInfo->pIgnoreTables);
|
||||
if (code) {
|
||||
qError("%s failed to open tsdbReader, code:%s at line:%d", idStr, tstrerror(code), __LINE__);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t groupSeqTableScan(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t lino = 0;
|
||||
|
@ -1250,6 +1273,7 @@ static int32_t groupSeqTableScan(SOperatorInfo* pOperator, SSDataBlock** pResBlo
|
|||
int32_t num = 0;
|
||||
STableKeyInfo* pList = NULL;
|
||||
SSDataBlock* pResult = NULL;
|
||||
const char* idStr = GET_TASKID(pTaskInfo);
|
||||
|
||||
QRY_PARAM_CHECK(pResBlock);
|
||||
|
||||
|
@ -1260,17 +1284,10 @@ static int32_t groupSeqTableScan(SOperatorInfo* pOperator, SSDataBlock** pResBlo
|
|||
}
|
||||
|
||||
taosRLockLatch(&pTaskInfo->lock);
|
||||
code = initNextGroupScan(pInfo, &pList, &num);
|
||||
|
||||
code = doInitReader(pInfo, pTaskInfo, pAPI, &num, &pList);
|
||||
taosRUnLockLatch(&pTaskInfo->lock);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
||||
QUERY_CHECK_CONDITION((pInfo->base.dataReader == NULL), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
|
||||
|
||||
code = pAPI->tsdReader.tsdReaderOpen(pInfo->base.readHandle.vnode, &pInfo->base.cond, pList, num, pInfo->pResBlock,
|
||||
(void**)&pInfo->base.dataReader, GET_TASKID(pTaskInfo), &pInfo->pIgnoreTables);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
||||
if (pInfo->filesetDelimited) {
|
||||
pAPI->tsdReader.tsdSetFilesetDelimited(pInfo->base.dataReader);
|
||||
}
|
||||
|
@ -1280,7 +1297,6 @@ static int32_t groupSeqTableScan(SOperatorInfo* pOperator, SSDataBlock** pResBlo
|
|||
}
|
||||
}
|
||||
|
||||
pResult = NULL;
|
||||
code = doGroupedTableScan(pOperator, &pResult);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
||||
|
@ -1305,7 +1321,7 @@ static int32_t groupSeqTableScan(SOperatorInfo* pOperator, SSDataBlock** pResBlo
|
|||
|
||||
_end:
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||
qError("%s %s failed at line %d since %s", idStr, __func__, lino, tstrerror(code));
|
||||
pTaskInfo->code = code;
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue