fix meta dead lock
This commit is contained in:
parent
f9c8b6cd7b
commit
9b418bee78
|
@ -90,7 +90,7 @@ static void switchCtxOrder(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t overlapWithTimeWindow(SInterval* pInterval, SDataBlockInfo* pBlockInfo, int32_t order, bool* overlap) {
|
static int32_t overlapWithTimeWindow(SInterval* pInterval, SDataBlockInfo* pBlockInfo, int32_t order, bool* overlap) {
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
STimeWindow w = {0};
|
STimeWindow w = {0};
|
||||||
|
|
||||||
// 0 by default, which means it is not a interval operator of the upstream operator.
|
// 0 by default, which means it is not a interval operator of the upstream operator.
|
||||||
|
@ -101,7 +101,7 @@ static int32_t overlapWithTimeWindow(SInterval* pInterval, SDataBlockInfo* pBloc
|
||||||
|
|
||||||
if (order == TSDB_ORDER_ASC) {
|
if (order == TSDB_ORDER_ASC) {
|
||||||
w = getAlignQueryTimeWindow(pInterval, pBlockInfo->window.skey);
|
w = getAlignQueryTimeWindow(pInterval, pBlockInfo->window.skey);
|
||||||
if(w.ekey < pBlockInfo->window.skey) {
|
if (w.ekey < pBlockInfo->window.skey) {
|
||||||
qError("w.ekey:%" PRId64 " < pBlockInfo->window.skey:%" PRId64, w.ekey, pBlockInfo->window.skey);
|
qError("w.ekey:%" PRId64 " < pBlockInfo->window.skey:%" PRId64, w.ekey, pBlockInfo->window.skey);
|
||||||
return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
|
return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
|
||||||
}
|
}
|
||||||
|
@ -117,7 +117,7 @@ static int32_t overlapWithTimeWindow(SInterval* pInterval, SDataBlockInfo* pBloc
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
if(w.ekey <= pBlockInfo->window.ekey) {
|
if (w.ekey <= pBlockInfo->window.ekey) {
|
||||||
qError("w.ekey:%" PRId64 " <= pBlockInfo->window.ekey:%" PRId64, w.ekey, pBlockInfo->window.ekey);
|
qError("w.ekey:%" PRId64 " <= pBlockInfo->window.ekey:%" PRId64, w.ekey, pBlockInfo->window.ekey);
|
||||||
return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
|
return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
|
||||||
}
|
}
|
||||||
|
@ -128,7 +128,7 @@ static int32_t overlapWithTimeWindow(SInterval* pInterval, SDataBlockInfo* pBloc
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
w = getAlignQueryTimeWindow(pInterval, pBlockInfo->window.ekey);
|
w = getAlignQueryTimeWindow(pInterval, pBlockInfo->window.ekey);
|
||||||
if(w.skey > pBlockInfo->window.ekey) {
|
if (w.skey > pBlockInfo->window.ekey) {
|
||||||
qError("w.skey:%" PRId64 " > pBlockInfo->window.skey:%" PRId64, w.skey, pBlockInfo->window.ekey);
|
qError("w.skey:%" PRId64 " > pBlockInfo->window.skey:%" PRId64, w.skey, pBlockInfo->window.ekey);
|
||||||
return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
|
return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
|
||||||
}
|
}
|
||||||
|
@ -144,7 +144,7 @@ static int32_t overlapWithTimeWindow(SInterval* pInterval, SDataBlockInfo* pBloc
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
if(w.skey >= pBlockInfo->window.skey){
|
if (w.skey >= pBlockInfo->window.skey) {
|
||||||
qError("w.skey:%" PRId64 " >= pBlockInfo->window.skey:%" PRId64, w.skey, pBlockInfo->window.skey);
|
qError("w.skey:%" PRId64 " >= pBlockInfo->window.skey:%" PRId64, w.skey, pBlockInfo->window.skey);
|
||||||
return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
|
return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
|
||||||
}
|
}
|
||||||
|
@ -227,7 +227,7 @@ static int32_t doDynamicPruneDataBlock(SOperatorInfo* pOperator, SDataBlockInfo*
|
||||||
|
|
||||||
SResultRowEntryInfo* pEntry = getResultEntryInfo(pRow, i, pTableScanInfo->base.pdInfo.pExprSup->rowEntryInfoOffset);
|
SResultRowEntryInfo* pEntry = getResultEntryInfo(pRow, i, pTableScanInfo->base.pdInfo.pExprSup->rowEntryInfoOffset);
|
||||||
|
|
||||||
EFuncDataRequired reqStatus = fmFuncDynDataRequired(functionId, pEntry, pBlockInfo);
|
EFuncDataRequired reqStatus = fmFuncDynDataRequired(functionId, pEntry, pBlockInfo);
|
||||||
if (reqStatus != FUNC_DATA_REQUIRED_NOT_LOAD) {
|
if (reqStatus != FUNC_DATA_REQUIRED_NOT_LOAD) {
|
||||||
notLoadBlock = false;
|
notLoadBlock = false;
|
||||||
break;
|
break;
|
||||||
|
@ -399,7 +399,7 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanBase* pTableSca
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if(*status != FUNC_DATA_REQUIRED_DATA_LOAD) {
|
if (*status != FUNC_DATA_REQUIRED_DATA_LOAD) {
|
||||||
pAPI->tsdReader.tsdReaderReleaseDataBlock(pTableScanInfo->dataReader);
|
pAPI->tsdReader.tsdReaderReleaseDataBlock(pTableScanInfo->dataReader);
|
||||||
qError("%s loadDataBlock invalid status:%d", GET_TASKID(pTaskInfo), *status);
|
qError("%s loadDataBlock invalid status:%d", GET_TASKID(pTaskInfo), *status);
|
||||||
return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
|
return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
|
||||||
|
@ -588,14 +588,14 @@ static void freeTableCachedValObj(STableCachedVal* pVal) {
|
||||||
|
|
||||||
int32_t addTagPseudoColumnData(SReadHandle* pHandle, const SExprInfo* pExpr, int32_t numOfExpr, SSDataBlock* pBlock,
|
int32_t addTagPseudoColumnData(SReadHandle* pHandle, const SExprInfo* pExpr, int32_t numOfExpr, SSDataBlock* pBlock,
|
||||||
int32_t rows, SExecTaskInfo* pTask, STableMetaCacheInfo* pCache) {
|
int32_t rows, SExecTaskInfo* pTask, STableMetaCacheInfo* pCache) {
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
int32_t lino = 0;
|
int32_t lino = 0;
|
||||||
bool freeReader = false;
|
bool freeReader = false;
|
||||||
LRUHandle* h = NULL;
|
LRUHandle* h = NULL;
|
||||||
STableCachedVal val = {0};
|
STableCachedVal val = {0};
|
||||||
SMetaReader mr = {0};
|
SMetaReader mr = {0};
|
||||||
const char* idStr = pTask->id.str;
|
const char* idStr = pTask->id.str;
|
||||||
int32_t insertRet = TAOS_LRU_STATUS_OK;
|
int32_t insertRet = TAOS_LRU_STATUS_OK;
|
||||||
STableCachedVal* pVal = NULL;
|
STableCachedVal* pVal = NULL;
|
||||||
|
|
||||||
// currently only the tbname pseudo column
|
// currently only the tbname pseudo column
|
||||||
|
@ -625,8 +625,7 @@ int32_t addTagPseudoColumnData(SReadHandle* pHandle, const SExprInfo* pExpr, int
|
||||||
// append null value before return to caller, since the caller will ignore this error code and proceed
|
// append null value before return to caller, since the caller will ignore this error code and proceed
|
||||||
doSetNullValue(pBlock, pExpr, numOfExpr);
|
doSetNullValue(pBlock, pExpr, numOfExpr);
|
||||||
} else {
|
} else {
|
||||||
qError("failed to get table meta, uid:0x%" PRIx64 ", code:%s, %s", pBlock->info.id.uid, tstrerror(code),
|
qError("failed to get table meta, uid:0x%" PRIx64 ", code:%s, %s", pBlock->info.id.uid, tstrerror(code), idStr);
|
||||||
idStr);
|
|
||||||
}
|
}
|
||||||
pHandle->api.metaReaderFn.clearReader(&mr);
|
pHandle->api.metaReaderFn.clearReader(&mr);
|
||||||
return code;
|
return code;
|
||||||
|
@ -743,7 +742,7 @@ _end:
|
||||||
|
|
||||||
if (NULL != pVal) {
|
if (NULL != pVal) {
|
||||||
insertRet = taosLRUCacheInsert(pCache->pTableMetaEntryCache, &pBlock->info.id.uid, sizeof(uint64_t), pVal,
|
insertRet = taosLRUCacheInsert(pCache->pTableMetaEntryCache, &pBlock->info.id.uid, sizeof(uint64_t), pVal,
|
||||||
sizeof(STableCachedVal), freeCachedMetaItem, NULL, TAOS_LRU_PRIORITY_LOW, NULL);
|
sizeof(STableCachedVal), freeCachedMetaItem, NULL, TAOS_LRU_PRIORITY_LOW, NULL);
|
||||||
if (insertRet != TAOS_LRU_STATUS_OK) {
|
if (insertRet != TAOS_LRU_STATUS_OK) {
|
||||||
qWarn("failed to put meta into lru cache, code:%d, %s", insertRet, idStr);
|
qWarn("failed to put meta into lru cache, code:%d, %s", insertRet, idStr);
|
||||||
}
|
}
|
||||||
|
@ -1416,7 +1415,7 @@ static int32_t getTableScannerExecInfo(struct SOperatorInfo* pOptr, void** pOptr
|
||||||
if (!pRecorder) {
|
if (!pRecorder) {
|
||||||
return terrno;
|
return terrno;
|
||||||
}
|
}
|
||||||
STableScanInfo* pTableScanInfo = pOptr->info;
|
STableScanInfo* pTableScanInfo = pOptr->info;
|
||||||
*pRecorder = pTableScanInfo->base.readRecorder;
|
*pRecorder = pTableScanInfo->base.readRecorder;
|
||||||
*pOptrExplain = pRecorder;
|
*pOptrExplain = pRecorder;
|
||||||
*len = sizeof(SFileBlockLoadRecorder);
|
*len = sizeof(SFileBlockLoadRecorder);
|
||||||
|
@ -1499,7 +1498,7 @@ int32_t createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHa
|
||||||
initResultSizeInfo(&pOperator->resultInfo, 4096);
|
initResultSizeInfo(&pOperator->resultInfo, 4096);
|
||||||
pInfo->pResBlock = createDataBlockFromDescNode(pDescNode);
|
pInfo->pResBlock = createDataBlockFromDescNode(pDescNode);
|
||||||
QUERY_CHECK_NULL(pInfo->pResBlock, code, lino, _error, terrno);
|
QUERY_CHECK_NULL(pInfo->pResBlock, code, lino, _error, terrno);
|
||||||
|
|
||||||
code = prepareDataBlockBuf(pInfo->pResBlock, &pInfo->base.matchInfo);
|
code = prepareDataBlockBuf(pInfo->pResBlock, &pInfo->base.matchInfo);
|
||||||
QUERY_CHECK_CODE(code, lino, _error);
|
QUERY_CHECK_CODE(code, lino, _error);
|
||||||
|
|
||||||
|
@ -1538,7 +1537,7 @@ int32_t createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHa
|
||||||
|
|
||||||
_error:
|
_error:
|
||||||
if (pInfo != NULL) {
|
if (pInfo != NULL) {
|
||||||
pInfo->base.pTableListInfo = NULL; // this attribute will be destroy outside of this function
|
pInfo->base.pTableListInfo = NULL; // this attribute will be destroy outside of this function
|
||||||
destroyTableScanOperatorInfo(pInfo);
|
destroyTableScanOperatorInfo(pInfo);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1660,7 +1659,7 @@ static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbU
|
||||||
SSDataBlock* pBlock = pTableScanInfo->pResBlock;
|
SSDataBlock* pBlock = pTableScanInfo->pResBlock;
|
||||||
STsdbReader* pReader = NULL;
|
STsdbReader* pReader = NULL;
|
||||||
code = pAPI->tsdReader.tsdReaderOpen(pTableScanInfo->base.readHandle.vnode, &cond, &tblInfo, 1, pBlock,
|
code = pAPI->tsdReader.tsdReaderOpen(pTableScanInfo->base.readHandle.vnode, &cond, &tblInfo, 1, pBlock,
|
||||||
(void**)&pReader, GET_TASKID(pTaskInfo), NULL);
|
(void**)&pReader, GET_TASKID(pTaskInfo), NULL);
|
||||||
QUERY_CHECK_CODE(code, lino, _end);
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
|
|
||||||
bool hasNext = false;
|
bool hasNext = false;
|
||||||
|
@ -1813,7 +1812,7 @@ static void prepareRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pBlock, int32_
|
||||||
QUERY_CHECK_NULL(pInfo->pUpdateInfo, code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
|
QUERY_CHECK_NULL(pInfo->pUpdateInfo, code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
|
||||||
|
|
||||||
qDebug("prepare range scan start:%" PRId64 ",end:%" PRId64 ",maxVer:%" PRIu64, win.skey, win.ekey,
|
qDebug("prepare range scan start:%" PRId64 ",end:%" PRId64 ",maxVer:%" PRIu64, win.skey, win.ekey,
|
||||||
pInfo->pUpdateInfo->maxDataVersion);
|
pInfo->pUpdateInfo->maxDataVersion);
|
||||||
resetTableScanInfo(pInfo->pTableScanOp->info, &win, pInfo->pUpdateInfo->maxDataVersion);
|
resetTableScanInfo(pInfo->pTableScanOp->info, &win, pInfo->pUpdateInfo->maxDataVersion);
|
||||||
pInfo->pTableScanOp->status = OP_OPENED;
|
pInfo->pTableScanOp->status = OP_OPENED;
|
||||||
if (pRes) {
|
if (pRes) {
|
||||||
|
@ -2236,9 +2235,9 @@ static int32_t generateIntervalScanRange(SStreamScanInfo* pInfo, SSDataBlock* pS
|
||||||
}
|
}
|
||||||
|
|
||||||
uint64_t* srcUidData = (uint64_t*)pSrcUidCol->pData;
|
uint64_t* srcUidData = (uint64_t*)pSrcUidCol->pData;
|
||||||
TSKEY* srcStartTsCol = (TSKEY*)pSrcStartTsCol->pData;
|
TSKEY* srcStartTsCol = (TSKEY*)pSrcStartTsCol->pData;
|
||||||
TSKEY* srcEndTsCol = (TSKEY*)pSrcEndTsCol->pData;
|
TSKEY* srcEndTsCol = (TSKEY*)pSrcEndTsCol->pData;
|
||||||
int64_t ver = pSrcBlock->info.version - 1;
|
int64_t ver = pSrcBlock->info.version - 1;
|
||||||
|
|
||||||
if (pInfo->partitionSup.needCalc &&
|
if (pInfo->partitionSup.needCalc &&
|
||||||
(srcStartTsCol[0] != srcEndTsCol[0] || (hasPrimaryKeyCol(pInfo) && mode == STREAM_DELETE_DATA))) {
|
(srcStartTsCol[0] != srcEndTsCol[0] || (hasPrimaryKeyCol(pInfo) && mode == STREAM_DELETE_DATA))) {
|
||||||
|
@ -2804,8 +2803,8 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock
|
||||||
|
|
||||||
// currently only the tbname pseudo column
|
// currently only the tbname pseudo column
|
||||||
if (pInfo->numOfPseudoExpr > 0) {
|
if (pInfo->numOfPseudoExpr > 0) {
|
||||||
code = addTagPseudoColumnData(&pInfo->readHandle, pInfo->pPseudoExpr, pInfo->numOfPseudoExpr,
|
code = addTagPseudoColumnData(&pInfo->readHandle, pInfo->pPseudoExpr, pInfo->numOfPseudoExpr, pInfo->pRes,
|
||||||
pInfo->pRes, pBlockInfo->rows, pTaskInfo, &pTableScanInfo->base.metaCache);
|
pBlockInfo->rows, pTaskInfo, &pTableScanInfo->base.metaCache);
|
||||||
// ignore the table not exists error, since this table may have been dropped during the scan procedure.
|
// ignore the table not exists error, since this table may have been dropped during the scan procedure.
|
||||||
if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
|
if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
|
||||||
code = 0;
|
code = 0;
|
||||||
|
@ -3182,7 +3181,7 @@ void streamScanOperatorDecode(void* pBuff, int32_t len, SStreamScanInfo* pInfo)
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
void* pUpInfo = taosMemoryCalloc(1, sizeof(SUpdateInfo));
|
void* pUpInfo = taosMemoryCalloc(1, sizeof(SUpdateInfo));
|
||||||
if (!pUpInfo) {
|
if (!pUpInfo) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -3471,7 +3470,8 @@ FETCH_NEXT_BLOCK:
|
||||||
(*ppRes) = pInfo->pDeleteDataRes;
|
(*ppRes) = pInfo->pDeleteDataRes;
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
qError("%s===stream=== %s failed at line %d since pInfo->pUpdateRes is empty", GET_TASKID(pTaskInfo), __func__, lino);
|
qError("%s===stream=== %s failed at line %d since pInfo->pUpdateRes is empty", GET_TASKID(pTaskInfo), __func__,
|
||||||
|
lino);
|
||||||
blockDataCleanup(pInfo->pUpdateDataRes);
|
blockDataCleanup(pInfo->pUpdateDataRes);
|
||||||
pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
|
pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
|
||||||
} break;
|
} break;
|
||||||
|
@ -3484,7 +3484,8 @@ FETCH_NEXT_BLOCK:
|
||||||
(*ppRes) = pInfo->pUpdateRes;
|
(*ppRes) = pInfo->pUpdateRes;
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
qError("%s===stream=== %s failed at line %d since pInfo->pUpdateRes is empty", GET_TASKID(pTaskInfo), __func__, lino);
|
qError("%s===stream=== %s failed at line %d since pInfo->pUpdateRes is empty", GET_TASKID(pTaskInfo), __func__,
|
||||||
|
lino);
|
||||||
blockDataCleanup(pInfo->pUpdateDataRes);
|
blockDataCleanup(pInfo->pUpdateDataRes);
|
||||||
pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
|
pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
|
||||||
} break;
|
} break;
|
||||||
|
@ -3671,7 +3672,7 @@ static int32_t extractTableIdList(const STableListInfo* pTableListInfo, SArray**
|
||||||
for (int32_t i = 0; i < size; ++i) {
|
for (int32_t i = 0; i < size; ++i) {
|
||||||
STableKeyInfo* pkeyInfo = tableListGetInfo(pTableListInfo, i);
|
STableKeyInfo* pkeyInfo = tableListGetInfo(pTableListInfo, i);
|
||||||
QUERY_CHECK_NULL(pkeyInfo, code, lino, _end, terrno);
|
QUERY_CHECK_NULL(pkeyInfo, code, lino, _end, terrno);
|
||||||
void* tmp = taosArrayPush(tableIdList, &pkeyInfo->uid);
|
void* tmp = taosArrayPush(tableIdList, &pkeyInfo->uid);
|
||||||
QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
|
QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -4066,7 +4067,7 @@ int32_t createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode*
|
||||||
int32_t numOfOutput = taosArrayGetSize(pInfo->matchInfo.pList);
|
int32_t numOfOutput = taosArrayGetSize(pInfo->matchInfo.pList);
|
||||||
pColIds = taosArrayInit(numOfOutput, sizeof(int16_t));
|
pColIds = taosArrayInit(numOfOutput, sizeof(int16_t));
|
||||||
QUERY_CHECK_NULL(pColIds, code, lino, _error, terrno);
|
QUERY_CHECK_NULL(pColIds, code, lino, _error, terrno);
|
||||||
|
|
||||||
for (int32_t i = 0; i < numOfOutput; ++i) {
|
for (int32_t i = 0; i < numOfOutput; ++i) {
|
||||||
SColMatchItem* id = taosArrayGet(pInfo->matchInfo.pList, i);
|
SColMatchItem* id = taosArrayGet(pInfo->matchInfo.pList, i);
|
||||||
QUERY_CHECK_NULL(id, code, lino, _error, terrno);
|
QUERY_CHECK_NULL(id, code, lino, _error, terrno);
|
||||||
|
@ -4268,7 +4269,7 @@ _error:
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t doTagScanOneTable(SOperatorInfo* pOperator, const SSDataBlock* pRes, int32_t count, SMetaReader* mr,
|
static int32_t doTagScanOneTable(SOperatorInfo* pOperator, const SSDataBlock* pRes, int32_t count, SMetaReader* mr,
|
||||||
SStorageAPI* pAPI) {
|
SStorageAPI* pAPI) {
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
int32_t lino = 0;
|
int32_t lino = 0;
|
||||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
|
@ -4280,7 +4281,6 @@ static int32_t doTagScanOneTable(SOperatorInfo* pOperator, const SSDataBlock* pR
|
||||||
qError("failed to get table meta, uid:0x%" PRIx64 ", code:%s, %s", item->uid, tstrerror(terrno),
|
qError("failed to get table meta, uid:0x%" PRIx64 ", code:%s, %s", item->uid, tstrerror(terrno),
|
||||||
GET_TASKID(pTaskInfo));
|
GET_TASKID(pTaskInfo));
|
||||||
tDecoderClear(&(*mr).coder);
|
tDecoderClear(&(*mr).coder);
|
||||||
pAPI->metaReaderFn.clearReader(mr);
|
|
||||||
goto _end;
|
goto _end;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -4289,7 +4289,6 @@ static int32_t doTagScanOneTable(SOperatorInfo* pOperator, const SSDataBlock* pR
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
qError("failed to get table meta, uid:0x%" PRIx64 ", code:%s, %s", item->uid, tstrerror(terrno),
|
qError("failed to get table meta, uid:0x%" PRIx64 ", code:%s, %s", item->uid, tstrerror(terrno),
|
||||||
GET_TASKID(pTaskInfo));
|
GET_TASKID(pTaskInfo));
|
||||||
pAPI->metaReaderFn.clearReader(mr);
|
|
||||||
goto _end;
|
goto _end;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -4697,6 +4696,12 @@ static int32_t doTagScanFromMetaEntryNext(SOperatorInfo* pOperator, SSDataBlock*
|
||||||
|
|
||||||
while (pInfo->curPos < size && count < pOperator->resultInfo.capacity) {
|
while (pInfo->curPos < size && count < pOperator->resultInfo.capacity) {
|
||||||
code = doTagScanOneTable(pOperator, pRes, count, &mr, &pTaskInfo->storageAPI);
|
code = doTagScanOneTable(pOperator, pRes, count, &mr, &pTaskInfo->storageAPI);
|
||||||
|
if (code == TSDB_CODE_OUT_OF_MEMORY) {
|
||||||
|
break;
|
||||||
|
} else {
|
||||||
|
// ignore other error
|
||||||
|
}
|
||||||
|
|
||||||
++count;
|
++count;
|
||||||
if (++pInfo->curPos >= size) {
|
if (++pInfo->curPos >= size) {
|
||||||
setOperatorCompleted(pOperator);
|
setOperatorCompleted(pOperator);
|
||||||
|
@ -4814,7 +4819,7 @@ int32_t createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysiNode* p
|
||||||
nodesRewriteExprPostOrder(&pTagCond, tagScanRewriteTagColumn, (void*)&pInfo->filterCtx);
|
nodesRewriteExprPostOrder(&pTagCond, tagScanRewriteTagColumn, (void*)&pInfo->filterCtx);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
//TODO wjm check pInfo->filterCtx.code
|
// TODO wjm check pInfo->filterCtx.code
|
||||||
__optr_fn_t tagScanNextFn = (pTagScanNode->onlyMetaCtbIdx) ? doTagScanFromCtbIdxNext : doTagScanFromMetaEntryNext;
|
__optr_fn_t tagScanNextFn = (pTagScanNode->onlyMetaCtbIdx) ? doTagScanFromCtbIdxNext : doTagScanFromMetaEntryNext;
|
||||||
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, tagScanNextFn, NULL, destroyTagScanOperatorInfo,
|
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, tagScanNextFn, NULL, destroyTagScanOperatorInfo,
|
||||||
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
|
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
|
||||||
|
@ -4925,7 +4930,7 @@ static int32_t fetchNextSubTableBlockFromReader(SOperatorInfo* pOperator, STmsSu
|
||||||
pInfo->base.dataReader = pInput->pReader;
|
pInfo->base.dataReader = pInput->pReader;
|
||||||
|
|
||||||
while (true) {
|
while (true) {
|
||||||
bool hasNext = false;
|
bool hasNext = false;
|
||||||
code = pAPI->tsdReader.tsdNextDataBlock(pInfo->base.dataReader, &hasNext);
|
code = pAPI->tsdReader.tsdNextDataBlock(pInfo->base.dataReader, &hasNext);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
pAPI->tsdReader.tsdReaderReleaseDataBlock(pInfo->base.dataReader);
|
pAPI->tsdReader.tsdReaderReleaseDataBlock(pInfo->base.dataReader);
|
||||||
|
@ -5036,7 +5041,7 @@ static int32_t openSubTablesMergeSort(STmsSubTablesMergeInfo* pSubTblsInfo) {
|
||||||
static int32_t initSubTablesMergeInfo(STableMergeScanInfo* pInfo) {
|
static int32_t initSubTablesMergeInfo(STableMergeScanInfo* pInfo) {
|
||||||
int32_t code = setGroupStartEndIndex(pInfo);
|
int32_t code = setGroupStartEndIndex(pInfo);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
|
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
STmsSubTablesMergeInfo* pSubTblsInfo = taosMemoryCalloc(1, sizeof(STmsSubTablesMergeInfo));
|
STmsSubTablesMergeInfo* pSubTblsInfo = taosMemoryCalloc(1, sizeof(STmsSubTablesMergeInfo));
|
||||||
|
@ -5065,8 +5070,7 @@ static int32_t initSubTablesMergeInfo(STableMergeScanInfo* pInfo) {
|
||||||
}
|
}
|
||||||
int32_t bufPageSize = pInfo->bufPageSize;
|
int32_t bufPageSize = pInfo->bufPageSize;
|
||||||
int32_t inMemSize = (pSubTblsInfo->numSubTables - pSubTblsInfo->numTableBlocksInMem) * bufPageSize;
|
int32_t inMemSize = (pSubTblsInfo->numSubTables - pSubTblsInfo->numTableBlocksInMem) * bufPageSize;
|
||||||
code =
|
code = createDiskbasedBuf(&pSubTblsInfo->pBlocksBuf, pInfo->bufPageSize, inMemSize, "blocksExternalBuf", tsTempDir);
|
||||||
createDiskbasedBuf(&pSubTblsInfo->pBlocksBuf, pInfo->bufPageSize, inMemSize, "blocksExternalBuf", tsTempDir);
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
taosMemoryFree(pSubTblsInfo->aInputs);
|
taosMemoryFree(pSubTblsInfo->aInputs);
|
||||||
taosMemoryFree(pSubTblsInfo);
|
taosMemoryFree(pSubTblsInfo);
|
||||||
|
@ -5199,7 +5203,7 @@ static int32_t appendChosenRowToDataBlock(STmsSubTablesMergeInfo* pSubTblsInfo,
|
||||||
|
|
||||||
SColumnInfoData* pSrcColInfo = taosArrayGet(pInputBlock->pDataBlock, i);
|
SColumnInfoData* pSrcColInfo = taosArrayGet(pInputBlock->pDataBlock, i);
|
||||||
QUERY_CHECK_NULL(pSrcColInfo, code, lino, _end, terrno);
|
QUERY_CHECK_NULL(pSrcColInfo, code, lino, _end, terrno);
|
||||||
bool isNull = colDataIsNull(pSrcColInfo, pInputBlock->info.rows, pInput->rowIdx, NULL);
|
bool isNull = colDataIsNull(pSrcColInfo, pInputBlock->info.rows, pInput->rowIdx, NULL);
|
||||||
|
|
||||||
if (isNull) {
|
if (isNull) {
|
||||||
code = colDataSetVal(pColInfo, pBlock->info.rows, NULL, true);
|
code = colDataSetVal(pColInfo, pBlock->info.rows, NULL, true);
|
||||||
|
@ -5223,7 +5227,8 @@ _end:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t getSubTablesSortedBlock(SOperatorInfo* pOperator, SSDataBlock* pBlock, int32_t capacity, SSDataBlock** pResBlock) {
|
static int32_t getSubTablesSortedBlock(SOperatorInfo* pOperator, SSDataBlock* pBlock, int32_t capacity,
|
||||||
|
SSDataBlock** pResBlock) {
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
int32_t lino = 0;
|
int32_t lino = 0;
|
||||||
STableMergeScanInfo* pInfo = pOperator->info;
|
STableMergeScanInfo* pInfo = pOperator->info;
|
||||||
|
@ -5580,9 +5585,9 @@ static int32_t getBlockForTableMergeScan(void* param, SSDataBlock** ppBlock) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t generateSortByTsPkInfo(SArray* colMatchInfo, int32_t order, SArray** ppSortArray) {
|
int32_t generateSortByTsPkInfo(SArray* colMatchInfo, int32_t order, SArray** ppSortArray) {
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
int32_t lino = 0;
|
int32_t lino = 0;
|
||||||
SArray* pSortInfo = taosArrayInit(1, sizeof(SBlockOrderInfo));
|
SArray* pSortInfo = taosArrayInit(1, sizeof(SBlockOrderInfo));
|
||||||
QUERY_CHECK_NULL(pSortInfo, code, lino, _end, terrno);
|
QUERY_CHECK_NULL(pSortInfo, code, lino, _end, terrno);
|
||||||
SBlockOrderInfo biTs = {0};
|
SBlockOrderInfo biTs = {0};
|
||||||
SBlockOrderInfo biPk = {0};
|
SBlockOrderInfo biPk = {0};
|
||||||
|
@ -5983,7 +5988,7 @@ int32_t getTableMergeScanExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExpla
|
||||||
if (!execInfo) {
|
if (!execInfo) {
|
||||||
return terrno;
|
return terrno;
|
||||||
}
|
}
|
||||||
STableMergeScanInfo* pInfo = pOptr->info;
|
STableMergeScanInfo* pInfo = pOptr->info;
|
||||||
execInfo->blockRecorder = pInfo->base.readRecorder;
|
execInfo->blockRecorder = pInfo->base.readRecorder;
|
||||||
execInfo->sortExecInfo = pInfo->sortExecInfo;
|
execInfo->sortExecInfo = pInfo->sortExecInfo;
|
||||||
|
|
||||||
|
@ -6126,24 +6131,24 @@ _error:
|
||||||
|
|
||||||
// ====================================================================================================================
|
// ====================================================================================================================
|
||||||
// TableCountScanOperator
|
// TableCountScanOperator
|
||||||
static void destoryTableCountScanOperator(void* param);
|
static void destoryTableCountScanOperator(void* param);
|
||||||
static int32_t buildVnodeGroupedStbTableCount(STableCountScanOperatorInfo* pInfo, STableCountScanSupp* pSupp,
|
static int32_t buildVnodeGroupedStbTableCount(STableCountScanOperatorInfo* pInfo, STableCountScanSupp* pSupp,
|
||||||
SSDataBlock* pRes, char* dbName, tb_uid_t stbUid, SStorageAPI* pAPI);
|
SSDataBlock* pRes, char* dbName, tb_uid_t stbUid, SStorageAPI* pAPI);
|
||||||
static int32_t buildVnodeGroupedNtbTableCount(STableCountScanOperatorInfo* pInfo, STableCountScanSupp* pSupp,
|
static int32_t buildVnodeGroupedNtbTableCount(STableCountScanOperatorInfo* pInfo, STableCountScanSupp* pSupp,
|
||||||
SSDataBlock* pRes, char* dbName, SStorageAPI* pAPI);
|
SSDataBlock* pRes, char* dbName, SStorageAPI* pAPI);
|
||||||
static int32_t buildVnodeFilteredTbCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
|
static int32_t buildVnodeFilteredTbCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
|
||||||
STableCountScanSupp* pSupp, SSDataBlock* pRes, char* dbName);
|
STableCountScanSupp* pSupp, SSDataBlock* pRes, char* dbName);
|
||||||
static int32_t buildVnodeGroupedTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
|
static int32_t buildVnodeGroupedTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
|
||||||
STableCountScanSupp* pSupp, SSDataBlock* pRes, int32_t vgId, char* dbName);
|
STableCountScanSupp* pSupp, SSDataBlock* pRes, int32_t vgId, char* dbName);
|
||||||
static int32_t buildVnodeDbTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
|
static int32_t buildVnodeDbTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
|
||||||
STableCountScanSupp* pSupp, SSDataBlock* pRes);
|
STableCountScanSupp* pSupp, SSDataBlock* pRes);
|
||||||
static void buildSysDbGroupedTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
|
static void buildSysDbGroupedTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
|
||||||
STableCountScanSupp* pSupp, SSDataBlock* pRes, size_t infodbTableNum,
|
STableCountScanSupp* pSupp, SSDataBlock* pRes, size_t infodbTableNum,
|
||||||
size_t perfdbTableNum);
|
size_t perfdbTableNum);
|
||||||
static void buildSysDbFilterTableCount(SOperatorInfo* pOperator, STableCountScanSupp* pSupp, SSDataBlock* pRes,
|
static void buildSysDbFilterTableCount(SOperatorInfo* pOperator, STableCountScanSupp* pSupp, SSDataBlock* pRes,
|
||||||
size_t infodbTableNum, size_t perfdbTableNum);
|
size_t infodbTableNum, size_t perfdbTableNum);
|
||||||
static const char* GROUP_TAG_DB_NAME = "db_name";
|
static const char* GROUP_TAG_DB_NAME = "db_name";
|
||||||
static const char* GROUP_TAG_STABLE_NAME = "stable_name";
|
static const char* GROUP_TAG_STABLE_NAME = "stable_name";
|
||||||
|
|
||||||
int32_t tblCountScanGetGroupTagsSlotId(const SNodeList* scanCols, STableCountScanSupp* supp) {
|
int32_t tblCountScanGetGroupTagsSlotId(const SNodeList* scanCols, STableCountScanSupp* supp) {
|
||||||
if (scanCols != NULL) {
|
if (scanCols != NULL) {
|
||||||
|
@ -6478,7 +6483,7 @@ _end:
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t buildVnodeGroupedTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
|
static int32_t buildVnodeGroupedTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
|
||||||
STableCountScanSupp* pSupp, SSDataBlock* pRes, int32_t vgId, char* dbName) {
|
STableCountScanSupp* pSupp, SSDataBlock* pRes, int32_t vgId, char* dbName) {
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
int32_t lino = 0;
|
int32_t lino = 0;
|
||||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
|
@ -6526,7 +6531,7 @@ _end:
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t buildVnodeFilteredTbCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
|
static int32_t buildVnodeFilteredTbCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
|
||||||
STableCountScanSupp* pSupp, SSDataBlock* pRes, char* dbName) {
|
STableCountScanSupp* pSupp, SSDataBlock* pRes, char* dbName) {
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
int32_t lino = 0;
|
int32_t lino = 0;
|
||||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
|
|
Loading…
Reference in New Issue