Merge branch 'main' of github.com:taosdata/TDengine into szhou/cenc
This commit is contained in:
commit
8e8b7d1014
|
@ -132,7 +132,8 @@ int32_t metaCacheOpen(SMeta* pMeta) {
|
||||||
goto _err2;
|
goto _err2;
|
||||||
}
|
}
|
||||||
|
|
||||||
pCache->sTagFilterResCache.pTableEntry = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), false, HASH_NO_LOCK);
|
pCache->sTagFilterResCache.pTableEntry =
|
||||||
|
taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), false, HASH_NO_LOCK);
|
||||||
if (pCache->sTagFilterResCache.pTableEntry == NULL) {
|
if (pCache->sTagFilterResCache.pTableEntry == NULL) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
goto _err2;
|
goto _err2;
|
||||||
|
@ -419,7 +420,8 @@ int32_t metaStatsCacheGet(SMeta* pMeta, int64_t uid, SMetaStbStats* pInfo) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t metaGetCachedTableUidList(SMeta* pMeta, tb_uid_t suid, const uint8_t* pKey, int32_t keyLen, SArray* pList1, bool* acquireRes) {
|
int32_t metaGetCachedTableUidList(SMeta* pMeta, tb_uid_t suid, const uint8_t* pKey, int32_t keyLen, SArray* pList1,
|
||||||
|
bool* acquireRes) {
|
||||||
uint64_t* pBuf = pMeta->pCache->sTagFilterResCache.keyBuf;
|
uint64_t* pBuf = pMeta->pCache->sTagFilterResCache.keyBuf;
|
||||||
|
|
||||||
// generate the composed key for LRU cache
|
// generate the composed key for LRU cache
|
||||||
|
@ -470,9 +472,12 @@ int32_t metaGetCachedTableUidList(SMeta* pMeta, tb_uid_t suid, const uint8_t* pK
|
||||||
for (int32_t i = 0; i < s; ++i) {
|
for (int32_t i = 0; i < s; ++i) {
|
||||||
SListNode** p1 = taosArrayGet(pList, i);
|
SListNode** p1 = taosArrayGet(pList, i);
|
||||||
tdListPopNode(&(*pEntry)->list, *p1);
|
tdListPopNode(&(*pEntry)->list, *p1);
|
||||||
|
taosMemoryFree(*p1);
|
||||||
}
|
}
|
||||||
|
|
||||||
(*pEntry)->qTimes = 0; // reset the query times
|
(*pEntry)->qTimes = 0; // reset the query times
|
||||||
|
|
||||||
|
taosArrayDestroy(pList);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -487,7 +492,8 @@ static void freePayload(const void* key, size_t keyLen, void* value) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// check both the payload size and selectivity ratio
|
// check both the payload size and selectivity ratio
|
||||||
int32_t metaUidFilterCachePut(SMeta* pMeta, uint64_t suid, const void* pKey, int32_t keyLen, void* pPayload, int32_t payloadLen, double selectivityRatio) {
|
int32_t metaUidFilterCachePut(SMeta* pMeta, uint64_t suid, const void* pKey, int32_t keyLen, void* pPayload,
|
||||||
|
int32_t payloadLen, double selectivityRatio) {
|
||||||
if (selectivityRatio > tsSelectivityRatio) {
|
if (selectivityRatio > tsSelectivityRatio) {
|
||||||
metaDebug("vgId:%d, suid:%" PRIu64
|
metaDebug("vgId:%d, suid:%" PRIu64
|
||||||
" failed to add to uid list cache, due to selectivity ratio %.2f less than threshold %.2f",
|
" failed to add to uid list cache, due to selectivity ratio %.2f less than threshold %.2f",
|
||||||
|
@ -525,9 +531,10 @@ int32_t metaUidFilterCachePut(SMeta* pMeta, uint64_t suid, const void* pKey, int
|
||||||
ASSERT(sizeof(uint64_t) + keyLen == 24);
|
ASSERT(sizeof(uint64_t) + keyLen == 24);
|
||||||
|
|
||||||
// add to cache.
|
// add to cache.
|
||||||
taosLRUCacheInsert(pCache, pBuf, sizeof(uint64_t) + keyLen, pPayload, payloadLen, freePayload, NULL, TAOS_LRU_PRIORITY_LOW);
|
taosLRUCacheInsert(pCache, pBuf, sizeof(uint64_t) + keyLen, pPayload, payloadLen, freePayload, NULL,
|
||||||
metaDebug("vgId:%d, suid:%"PRIu64" list cache added into cache, total:%d, tables:%d", TD_VID(pMeta->pVnode),
|
TAOS_LRU_PRIORITY_LOW);
|
||||||
suid, (int32_t) taosLRUCacheGetUsage(pCache), taosHashGetSize(pTableEntry));
|
metaDebug("vgId:%d, suid:%" PRIu64 " list cache added into cache, total:%d, tables:%d", TD_VID(pMeta->pVnode), suid,
|
||||||
|
(int32_t)taosLRUCacheGetUsage(pCache), taosHashGetSize(pTableEntry));
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
|
@ -314,7 +314,8 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t rowIndex = j - num;
|
int32_t rowIndex = j - num;
|
||||||
applyAggFunctionOnPartialTuples(pTaskInfo, pCtx, NULL, rowIndex, num, pBlock->info.rows, pOperator->exprSupp.numOfExprs);
|
applyAggFunctionOnPartialTuples(pTaskInfo, pCtx, NULL, rowIndex, num, pBlock->info.rows,
|
||||||
|
pOperator->exprSupp.numOfExprs);
|
||||||
|
|
||||||
// assign the group keys or user input constant values if required
|
// assign the group keys or user input constant values if required
|
||||||
doAssignGroupKeys(pCtx, pOperator->exprSupp.numOfExprs, pBlock->info.rows, rowIndex);
|
doAssignGroupKeys(pCtx, pOperator->exprSupp.numOfExprs, pBlock->info.rows, rowIndex);
|
||||||
|
@ -331,7 +332,8 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t rowIndex = pBlock->info.rows - num;
|
int32_t rowIndex = pBlock->info.rows - num;
|
||||||
applyAggFunctionOnPartialTuples(pTaskInfo, pCtx, NULL, rowIndex, num, pBlock->info.rows, pOperator->exprSupp.numOfExprs);
|
applyAggFunctionOnPartialTuples(pTaskInfo, pCtx, NULL, rowIndex, num, pBlock->info.rows,
|
||||||
|
pOperator->exprSupp.numOfExprs);
|
||||||
doAssignGroupKeys(pCtx, pOperator->exprSupp.numOfExprs, pBlock->info.rows, rowIndex);
|
doAssignGroupKeys(pCtx, pOperator->exprSupp.numOfExprs, pBlock->info.rows, rowIndex);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -469,8 +471,8 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode*
|
||||||
initResultRowInfo(&pInfo->binfo.resultRowInfo);
|
initResultRowInfo(&pInfo->binfo.resultRowInfo);
|
||||||
setOperatorInfo(pOperator, "GroupbyAggOperator", 0, true, OP_NOT_OPENED, pInfo, pTaskInfo);
|
setOperatorInfo(pOperator, "GroupbyAggOperator", 0, true, OP_NOT_OPENED, pInfo, pTaskInfo);
|
||||||
|
|
||||||
pOperator->fpSet =
|
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, hashGroupbyAggregate, NULL, destroyGroupOperatorInfo,
|
||||||
createOperatorFpSet(optrDummyOpenFn, hashGroupbyAggregate, NULL, destroyGroupOperatorInfo, optrDefaultBufFn, NULL);
|
optrDefaultBufFn, NULL);
|
||||||
code = appendDownstream(pOperator, &downstream, 1);
|
code = appendDownstream(pOperator, &downstream, 1);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
goto _error;
|
goto _error;
|
||||||
|
@ -776,6 +778,12 @@ static void destroyPartitionOperatorInfo(void* param) {
|
||||||
|
|
||||||
taosArrayDestroy(pInfo->pGroupColVals);
|
taosArrayDestroy(pInfo->pGroupColVals);
|
||||||
taosMemoryFree(pInfo->keyBuf);
|
taosMemoryFree(pInfo->keyBuf);
|
||||||
|
|
||||||
|
int32_t size = taosArrayGetSize(pInfo->sortedGroupArray);
|
||||||
|
for (int32_t i = 0; i < size; i++) {
|
||||||
|
SDataGroupInfo* pGp = taosArrayGet(pInfo->sortedGroupArray, i);
|
||||||
|
taosArrayDestroy(pGp->pPageList);
|
||||||
|
}
|
||||||
taosArrayDestroy(pInfo->sortedGroupArray);
|
taosArrayDestroy(pInfo->sortedGroupArray);
|
||||||
|
|
||||||
void* pGroupIter = taosHashIterate(pInfo->pGroupSet, NULL);
|
void* pGroupIter = taosHashIterate(pInfo->pGroupSet, NULL);
|
||||||
|
@ -850,7 +858,8 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartition
|
||||||
pOperator->exprSupp.numOfExprs = numOfCols;
|
pOperator->exprSupp.numOfExprs = numOfCols;
|
||||||
pOperator->exprSupp.pExprInfo = pExprInfo;
|
pOperator->exprSupp.pExprInfo = pExprInfo;
|
||||||
|
|
||||||
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, hashPartition, NULL, destroyPartitionOperatorInfo, optrDefaultBufFn, NULL);
|
pOperator->fpSet =
|
||||||
|
createOperatorFpSet(optrDummyOpenFn, hashPartition, NULL, destroyPartitionOperatorInfo, optrDefaultBufFn, NULL);
|
||||||
|
|
||||||
code = appendDownstream(pOperator, &downstream, 1);
|
code = appendDownstream(pOperator, &downstream, 1);
|
||||||
return pOperator;
|
return pOperator;
|
||||||
|
@ -1141,8 +1150,8 @@ SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStr
|
||||||
pInfo, pTaskInfo);
|
pInfo, pTaskInfo);
|
||||||
pOperator->exprSupp.numOfExprs = numOfCols;
|
pOperator->exprSupp.numOfExprs = numOfCols;
|
||||||
pOperator->exprSupp.pExprInfo = pExprInfo;
|
pOperator->exprSupp.pExprInfo = pExprInfo;
|
||||||
pOperator->fpSet =
|
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamHashPartition, NULL,
|
||||||
createOperatorFpSet(optrDummyOpenFn, doStreamHashPartition, NULL, destroyStreamPartitionOperatorInfo, optrDefaultBufFn, NULL);
|
destroyStreamPartitionOperatorInfo, optrDefaultBufFn, NULL);
|
||||||
|
|
||||||
initParDownStream(downstream, &pInfo->partitionSup, &pInfo->scalarSup);
|
initParDownStream(downstream, &pInfo->partitionSup, &pInfo->scalarSup);
|
||||||
code = appendDownstream(pOperator, &downstream, 1);
|
code = appendDownstream(pOperator, &downstream, 1);
|
||||||
|
|
|
@ -49,10 +49,10 @@ static void doKeepPrevRows(STimeSliceOperatorInfo* pSliceInfo, const SSDataBlock
|
||||||
if (!colDataIsNull_s(pColInfoData, rowIndex)) {
|
if (!colDataIsNull_s(pColInfoData, rowIndex)) {
|
||||||
pkey->isNull = false;
|
pkey->isNull = false;
|
||||||
char* val = colDataGetData(pColInfoData, rowIndex);
|
char* val = colDataGetData(pColInfoData, rowIndex);
|
||||||
if (!IS_VAR_DATA_TYPE(pkey->type)) {
|
if (IS_VAR_DATA_TYPE(pkey->type)) {
|
||||||
memcpy(pkey->pData, val, pkey->bytes);
|
|
||||||
} else {
|
|
||||||
memcpy(pkey->pData, val, varDataLen(val));
|
memcpy(pkey->pData, val, varDataLen(val));
|
||||||
|
} else {
|
||||||
|
memcpy(pkey->pData, val, pkey->bytes);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
pkey->isNull = true;
|
pkey->isNull = true;
|
||||||
|
@ -98,13 +98,26 @@ static void doKeepLinearInfo(STimeSliceOperatorInfo* pSliceInfo, const SSDataBlo
|
||||||
ASSERT(IS_MATHABLE_TYPE(pColInfoData->info.type));
|
ASSERT(IS_MATHABLE_TYPE(pColInfoData->info.type));
|
||||||
|
|
||||||
pLinearInfo->start.key = *(int64_t*)colDataGetData(pTsCol, rowIndex);
|
pLinearInfo->start.key = *(int64_t*)colDataGetData(pTsCol, rowIndex);
|
||||||
memcpy(pLinearInfo->start.val, colDataGetData(pColInfoData, rowIndex), pLinearInfo->bytes);
|
char* p = colDataGetData(pColInfoData, rowIndex);
|
||||||
|
if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
|
||||||
|
ASSERT(varDataTLen(p) <= pColInfoData->info.bytes);
|
||||||
|
memcpy(pLinearInfo->start.val, p, varDataTLen(p));
|
||||||
|
} else {
|
||||||
|
memcpy(pLinearInfo->start.val, p, pLinearInfo->bytes);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
pLinearInfo->isStartSet = true;
|
pLinearInfo->isStartSet = true;
|
||||||
} else if (!pLinearInfo->isEndSet) {
|
} else if (!pLinearInfo->isEndSet) {
|
||||||
if (!colDataIsNull_s(pColInfoData, rowIndex)) {
|
if (!colDataIsNull_s(pColInfoData, rowIndex)) {
|
||||||
pLinearInfo->end.key = *(int64_t*)colDataGetData(pTsCol, rowIndex);
|
pLinearInfo->end.key = *(int64_t*)colDataGetData(pTsCol, rowIndex);
|
||||||
memcpy(pLinearInfo->end.val, colDataGetData(pColInfoData, rowIndex), pLinearInfo->bytes);
|
|
||||||
|
char* p = colDataGetData(pColInfoData, rowIndex);
|
||||||
|
if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
|
||||||
|
ASSERT(varDataTLen(p) <= pColInfoData->info.bytes);
|
||||||
|
memcpy(pLinearInfo->end.val, p, varDataTLen(p));
|
||||||
|
} else {
|
||||||
|
memcpy(pLinearInfo->end.val, p, pLinearInfo->bytes);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
pLinearInfo->isEndSet = true;
|
pLinearInfo->isEndSet = true;
|
||||||
} else {
|
} else {
|
||||||
|
@ -113,7 +126,15 @@ static void doKeepLinearInfo(STimeSliceOperatorInfo* pSliceInfo, const SSDataBlo
|
||||||
|
|
||||||
if (!colDataIsNull_s(pColInfoData, rowIndex)) {
|
if (!colDataIsNull_s(pColInfoData, rowIndex)) {
|
||||||
pLinearInfo->end.key = *(int64_t*)colDataGetData(pTsCol, rowIndex);
|
pLinearInfo->end.key = *(int64_t*)colDataGetData(pTsCol, rowIndex);
|
||||||
memcpy(pLinearInfo->end.val, colDataGetData(pColInfoData, rowIndex), pLinearInfo->bytes);
|
|
||||||
|
char* p = colDataGetData(pColInfoData, rowIndex);
|
||||||
|
if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
|
||||||
|
ASSERT(varDataTLen(p) <= pColInfoData->info.bytes);
|
||||||
|
memcpy(pLinearInfo->end.val, p, varDataTLen(p));
|
||||||
|
} else {
|
||||||
|
memcpy(pLinearInfo->end.val, p, pLinearInfo->bytes);
|
||||||
|
}
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
pLinearInfo->end.key = INT64_MIN;
|
pLinearInfo->end.key = INT64_MIN;
|
||||||
}
|
}
|
||||||
|
|
|
@ -1626,9 +1626,9 @@ bool cliGenRetryRule(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
|
||||||
pCtx->retryNextInterval = pCtx->retryMaxInterval;
|
pCtx->retryNextInterval = pCtx->retryMaxInterval;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (-1 != pCtx->retryMaxTimeout && taosGetTimestampMs() - pCtx->retryInitTimestamp >= pCtx->retryMaxTimeout) {
|
// if (-1 != pCtx->retryMaxTimeout && taosGetTimestampMs() - pCtx->retryInitTimestamp >= pCtx->retryMaxTimeout) {
|
||||||
return false;
|
// return false;
|
||||||
}
|
// }
|
||||||
} else {
|
} else {
|
||||||
pCtx->retryNextInterval = 0;
|
pCtx->retryNextInterval = 0;
|
||||||
pCtx->epsetRetryCnt++;
|
pCtx->epsetRetryCnt++;
|
||||||
|
|
|
@ -801,7 +801,7 @@ bool taosAssert(bool condition, const char *file, int32_t line, const char *form
|
||||||
taosPrintTrace(flags, level, dflag);
|
taosPrintTrace(flags, level, dflag);
|
||||||
|
|
||||||
if (tsAssert) {
|
if (tsAssert) {
|
||||||
taosCloseLog();
|
// taosCloseLog();
|
||||||
taosMsleep(300);
|
taosMsleep(300);
|
||||||
|
|
||||||
#ifdef NDEBUG
|
#ifdef NDEBUG
|
||||||
|
|
Loading…
Reference in New Issue