refactor: do some internal refactor.
This commit is contained in:
parent
1adf59e7d8
commit
b0f68a9147
|
@ -1637,8 +1637,8 @@ int32_t tsdbCacheGetBatch(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCache
|
||||||
SLastCol *pLastCol = (SLastCol *)taosLRUCacheValue(pCache, h);
|
SLastCol *pLastCol = (SLastCol *)taosLRUCacheValue(pCache, h);
|
||||||
|
|
||||||
SLastCol lastCol = *pLastCol;
|
SLastCol lastCol = *pLastCol;
|
||||||
for (int8_t i = 0; i < lastCol.rowKey.numOfPKs; i++) {
|
for (int8_t j = 0; j < lastCol.rowKey.numOfPKs; j++) {
|
||||||
reallocVarDataVal(&lastCol.rowKey.pks[i]);
|
reallocVarDataVal(&lastCol.rowKey.pks[j]);
|
||||||
}
|
}
|
||||||
reallocVarData(&lastCol.colVal);
|
reallocVarData(&lastCol.colVal);
|
||||||
taosArrayPush(pLastArray, &lastCol);
|
taosArrayPush(pLastArray, &lastCol);
|
||||||
|
@ -1667,8 +1667,8 @@ int32_t tsdbCacheGetBatch(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCache
|
||||||
SLastCol *pLastCol = (SLastCol *)taosLRUCacheValue(pCache, h);
|
SLastCol *pLastCol = (SLastCol *)taosLRUCacheValue(pCache, h);
|
||||||
|
|
||||||
SLastCol lastCol = *pLastCol;
|
SLastCol lastCol = *pLastCol;
|
||||||
for (int8_t i = 0; i < lastCol.rowKey.numOfPKs; i++) {
|
for (int8_t j = 0; j < lastCol.rowKey.numOfPKs; j++) {
|
||||||
reallocVarDataVal(&lastCol.rowKey.pks[i]);
|
reallocVarDataVal(&lastCol.rowKey.pks[j]);
|
||||||
}
|
}
|
||||||
reallocVarData(&lastCol.colVal);
|
reallocVarData(&lastCol.colVal);
|
||||||
taosArraySet(pLastArray, idxKey->idx, &lastCol);
|
taosArraySet(pLastArray, idxKey->idx, &lastCol);
|
||||||
|
|
|
@ -217,6 +217,7 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
|
||||||
SCacheRowsScanInfo* pInfo = pOperator->info;
|
SCacheRowsScanInfo* pInfo = pOperator->info;
|
||||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
STableListInfo* pTableList = pInfo->pTableList;
|
STableListInfo* pTableList = pInfo->pTableList;
|
||||||
|
SStoreCacheReader* pReaderFn = &pInfo->readHandle.api.cacheFn;
|
||||||
|
|
||||||
uint64_t suid = tableListGetSuid(pTableList);
|
uint64_t suid = tableListGetSuid(pTableList);
|
||||||
int32_t size = tableListGetSize(pTableList);
|
int32_t size = tableListGetSize(pTableList);
|
||||||
|
@ -237,8 +238,8 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
|
||||||
blockDataCleanup(pInfo->pBufferedRes);
|
blockDataCleanup(pInfo->pBufferedRes);
|
||||||
taosArrayClear(pInfo->pUidList);
|
taosArrayClear(pInfo->pUidList);
|
||||||
|
|
||||||
int32_t code = pInfo->readHandle.api.cacheFn.retrieveRows(pInfo->pLastrowReader, pInfo->pBufferedRes,
|
int32_t code = pReaderFn->retrieveRows(pInfo->pLastrowReader, pInfo->pBufferedRes, pInfo->pSlotIds,
|
||||||
pInfo->pSlotIds, pInfo->pDstSlotIds, pInfo->pUidList);
|
pInfo->pDstSlotIds, pInfo->pUidList);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
T_LONG_JMP(pTaskInfo->env, code);
|
T_LONG_JMP(pTaskInfo->env, code);
|
||||||
}
|
}
|
||||||
|
@ -307,10 +308,10 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (NULL == pInfo->pLastrowReader) {
|
if (NULL == pInfo->pLastrowReader) {
|
||||||
code = pInfo->readHandle.api.cacheFn.openReader(
|
code = pReaderFn->openReader(pInfo->readHandle.vnode, pInfo->retrieveType, pList, num,
|
||||||
pInfo->readHandle.vnode, pInfo->retrieveType, pList, num, taosArrayGetSize(pInfo->matchInfo.pList),
|
taosArrayGetSize(pInfo->matchInfo.pList), pInfo->pCidList, pInfo->pSlotIds, suid,
|
||||||
pInfo->pCidList, pInfo->pSlotIds, suid, &pInfo->pLastrowReader, pTaskInfo->id.str, pInfo->pFuncTypeList,
|
&pInfo->pLastrowReader, pTaskInfo->id.str, pInfo->pFuncTypeList, &pInfo->pkCol,
|
||||||
&pInfo->pkCol, pInfo->numOfPks);
|
pInfo->numOfPks);
|
||||||
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
pInfo->currentGroupIndex += 1;
|
pInfo->currentGroupIndex += 1;
|
||||||
|
@ -318,13 +319,13 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
pInfo->readHandle.api.cacheFn.reuseReader(pInfo->pLastrowReader, pList, num);
|
pReaderFn->reuseReader(pInfo->pLastrowReader, pList, num);
|
||||||
}
|
}
|
||||||
|
|
||||||
taosArrayClear(pInfo->pUidList);
|
taosArrayClear(pInfo->pUidList);
|
||||||
|
|
||||||
code = pInfo->readHandle.api.cacheFn.retrieveRows(pInfo->pLastrowReader, pInfo->pRes, pInfo->pSlotIds, pInfo->pDstSlotIds,
|
code = pReaderFn->retrieveRows(pInfo->pLastrowReader, pInfo->pRes, pInfo->pSlotIds, pInfo->pDstSlotIds,
|
||||||
pInfo->pUidList);
|
pInfo->pUidList);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
T_LONG_JMP(pTaskInfo->env, code);
|
T_LONG_JMP(pTaskInfo->env, code);
|
||||||
}
|
}
|
||||||
|
@ -357,7 +358,7 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pInfo->pLastrowReader = pInfo->readHandle.api.cacheFn.closeReader(pInfo->pLastrowReader);
|
pInfo->pLastrowReader = pReaderFn->closeReader(pInfo->pLastrowReader);
|
||||||
setOperatorCompleted(pOperator);
|
setOperatorCompleted(pOperator);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
|
@ -2441,7 +2441,7 @@ static void prepareBuf(SqlFunctionCtx* pCtx) {
|
||||||
int32_t rowLen = 0;
|
int32_t rowLen = 0;
|
||||||
for (int32_t j = 0; j < pCtx->subsidiaries.num; ++j) {
|
for (int32_t j = 0; j < pCtx->subsidiaries.num; ++j) {
|
||||||
SqlFunctionCtx* pc = pCtx->subsidiaries.pCtx[j];
|
SqlFunctionCtx* pc = pCtx->subsidiaries.pCtx[j];
|
||||||
rowLen += pc->resDataInfo.interBufSize;
|
rowLen += pc->pExpr->base.resSchema.bytes;
|
||||||
}
|
}
|
||||||
|
|
||||||
pCtx->subsidiaries.rowLen = rowLen + pCtx->subsidiaries.num * sizeof(bool);
|
pCtx->subsidiaries.rowLen = rowLen + pCtx->subsidiaries.num * sizeof(bool);
|
||||||
|
|
Loading…
Reference in New Issue