fix(stream): add lock when retrieving info from the tableGroup struct

This commit is contained in:
Haojun Liao 2024-04-11 22:54:35 +08:00
parent a951af2492
commit f2ccb8aa7e
1 changed files with 5 additions and 4 deletions

View File

@ -708,9 +708,7 @@ static void initNextGroupScan(STableScanInfo* pInfo, STableKeyInfo** pKeyInfo, i
tableListGetGroupList(pInfo->base.pTableListInfo, pInfo->currentGroupId, pKeyInfo, size); tableListGetGroupList(pInfo->base.pTableListInfo, pInfo->currentGroupId, pKeyInfo, size);
pInfo->tableStartIndex = TARRAY_ELEM_IDX(pInfo->base.pTableListInfo->pTableList, *pKeyInfo); pInfo->tableStartIndex = TARRAY_ELEM_IDX(pInfo->base.pTableListInfo->pTableList, *pKeyInfo);
pInfo->tableEndIndex = (pInfo->tableStartIndex + (*size) - 1); pInfo->tableEndIndex = (pInfo->tableStartIndex + (*size) - 1);
pInfo->pResBlock->info.blankFill = false; pInfo->pResBlock->info.blankFill = false;
if (!pInfo->needCountEmptyTable) { if (!pInfo->needCountEmptyTable) {
@ -1011,8 +1009,8 @@ static SSDataBlock* groupSeqTableScan(SOperatorInfo* pOperator) {
STableScanInfo* pInfo = pOperator->info; STableScanInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SStorageAPI* pAPI = &pTaskInfo->storageAPI; SStorageAPI* pAPI = &pTaskInfo->storageAPI;
int32_t num = 0; int32_t num = 0;
STableKeyInfo* pList = NULL; STableKeyInfo* pList = NULL;
if (pInfo->currentGroupId == -1) { if (pInfo->currentGroupId == -1) {
if ((++pInfo->currentGroupId) >= tableListGetOutputGroups(pInfo->base.pTableListInfo)) { if ((++pInfo->currentGroupId) >= tableListGetOutputGroups(pInfo->base.pTableListInfo)) {
@ -1020,7 +1018,10 @@ static SSDataBlock* groupSeqTableScan(SOperatorInfo* pOperator) {
return NULL; return NULL;
} }
taosRLockLatch(&pTaskInfo->lock);
initNextGroupScan(pInfo, &pList, &num); initNextGroupScan(pInfo, &pList, &num);
taosRUnLockLatch(&pTaskInfo->lock);
ASSERT(pInfo->base.dataReader == NULL); ASSERT(pInfo->base.dataReader == NULL);
int32_t code = pAPI->tsdReader.tsdReaderOpen(pInfo->base.readHandle.vnode, &pInfo->base.cond, pList, num, pInfo->pResBlock, int32_t code = pAPI->tsdReader.tsdReaderOpen(pInfo->base.readHandle.vnode, &pInfo->base.cond, pList, num, pInfo->pResBlock,