Merge branch '3.0' of https://github.com/taosdata/TDengine into enh/TD-31538-5
This commit is contained in:
commit
fcd62b9691
|
@ -29,7 +29,7 @@ void mndReleaseArbGroup(SMnode *pMnode, SArbGroup *pObj);
|
||||||
SSdbRaw *mndArbGroupActionEncode(SArbGroup *pGroup);
|
SSdbRaw *mndArbGroupActionEncode(SArbGroup *pGroup);
|
||||||
SSdbRow *mndArbGroupActionDecode(SSdbRaw *pRaw);
|
SSdbRow *mndArbGroupActionDecode(SSdbRaw *pRaw);
|
||||||
|
|
||||||
void mndArbGroupInitFromVgObj(SVgObj *pVgObj, SArbGroup *outGroup);
|
int32_t mndArbGroupInitFromVgObj(SVgObj *pVgObj, SArbGroup *outGroup);
|
||||||
|
|
||||||
int32_t mndSetCreateArbGroupRedoLogs(STrans *pTrans, SArbGroup *pGroup);
|
int32_t mndSetCreateArbGroupRedoLogs(STrans *pTrans, SArbGroup *pGroup);
|
||||||
int32_t mndSetCreateArbGroupUndoLogs(STrans *pTrans, SArbGroup *pGroup);
|
int32_t mndSetCreateArbGroupUndoLogs(STrans *pTrans, SArbGroup *pGroup);
|
||||||
|
|
|
@ -102,8 +102,10 @@ void mndReleaseArbGroup(SMnode *pMnode, SArbGroup *pGroup) {
|
||||||
sdbRelease(pSdb, pGroup);
|
sdbRelease(pSdb, pGroup);
|
||||||
}
|
}
|
||||||
|
|
||||||
void mndArbGroupInitFromVgObj(SVgObj *pVgObj, SArbGroup *outGroup) {
|
int32_t mndArbGroupInitFromVgObj(SVgObj *pVgObj, SArbGroup *outGroup) {
|
||||||
ASSERT(pVgObj->replica == 2);
|
if (pVgObj->replica != 2) {
|
||||||
|
TAOS_RETURN(TSDB_CODE_INVALID_PARA);
|
||||||
|
}
|
||||||
(void)memset(outGroup, 0, sizeof(SArbGroup));
|
(void)memset(outGroup, 0, sizeof(SArbGroup));
|
||||||
outGroup->dbUid = pVgObj->dbUid;
|
outGroup->dbUid = pVgObj->dbUid;
|
||||||
outGroup->vgId = pVgObj->vgId;
|
outGroup->vgId = pVgObj->vgId;
|
||||||
|
@ -111,6 +113,8 @@ void mndArbGroupInitFromVgObj(SVgObj *pVgObj, SArbGroup *outGroup) {
|
||||||
SArbGroupMember *pMember = &outGroup->members[i];
|
SArbGroupMember *pMember = &outGroup->members[i];
|
||||||
pMember->info.dnodeId = pVgObj->vnodeGid[i].dnodeId;
|
pMember->info.dnodeId = pVgObj->vnodeGid[i].dnodeId;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TAOS_RETURN(TSDB_CODE_SUCCESS);
|
||||||
}
|
}
|
||||||
|
|
||||||
SSdbRaw *mndArbGroupActionEncode(SArbGroup *pGroup) {
|
SSdbRaw *mndArbGroupActionEncode(SArbGroup *pGroup) {
|
||||||
|
|
|
@ -629,7 +629,7 @@ static int32_t mndSetCreateDbRedoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pD
|
||||||
for (int32_t v = 0; v < pDb->cfg.numOfVgroups; ++v) {
|
for (int32_t v = 0; v < pDb->cfg.numOfVgroups; ++v) {
|
||||||
SVgObj *pVgObj = pVgroups + v;
|
SVgObj *pVgObj = pVgroups + v;
|
||||||
SArbGroup arbGroup = {0};
|
SArbGroup arbGroup = {0};
|
||||||
mndArbGroupInitFromVgObj(pVgObj, &arbGroup);
|
TAOS_CHECK_RETURN(mndArbGroupInitFromVgObj(pVgObj, &arbGroup));
|
||||||
TAOS_CHECK_RETURN(mndSetCreateArbGroupRedoLogs(pTrans, &arbGroup));
|
TAOS_CHECK_RETURN(mndSetCreateArbGroupRedoLogs(pTrans, &arbGroup));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -663,7 +663,7 @@ static int32_t mndSetCreateDbUndoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pD
|
||||||
for (int32_t v = 0; v < pDb->cfg.numOfVgroups; ++v) {
|
for (int32_t v = 0; v < pDb->cfg.numOfVgroups; ++v) {
|
||||||
SVgObj *pVgObj = pVgroups + v;
|
SVgObj *pVgObj = pVgroups + v;
|
||||||
SArbGroup arbGroup = {0};
|
SArbGroup arbGroup = {0};
|
||||||
mndArbGroupInitFromVgObj(pVgObj, &arbGroup);
|
TAOS_CHECK_RETURN(mndArbGroupInitFromVgObj(pVgObj, &arbGroup));
|
||||||
TAOS_CHECK_RETURN(mndSetCreateArbGroupUndoLogs(pTrans, &arbGroup));
|
TAOS_CHECK_RETURN(mndSetCreateArbGroupUndoLogs(pTrans, &arbGroup));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -698,7 +698,7 @@ static int32_t mndSetCreateDbCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj *
|
||||||
for (int32_t v = 0; v < pDb->cfg.numOfVgroups; ++v) {
|
for (int32_t v = 0; v < pDb->cfg.numOfVgroups; ++v) {
|
||||||
SVgObj *pVgObj = pVgroups + v;
|
SVgObj *pVgObj = pVgroups + v;
|
||||||
SArbGroup arbGroup = {0};
|
SArbGroup arbGroup = {0};
|
||||||
mndArbGroupInitFromVgObj(pVgObj, &arbGroup);
|
TAOS_CHECK_RETURN(mndArbGroupInitFromVgObj(pVgObj, &arbGroup));
|
||||||
TAOS_CHECK_RETURN(mndSetCreateArbGroupCommitLogs(pTrans, &arbGroup));
|
TAOS_CHECK_RETURN(mndSetCreateArbGroupCommitLogs(pTrans, &arbGroup));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1156,44 +1156,30 @@ static int32_t mndSetAlterDbCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj *p
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndSetAlterDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb, SDbObj *pNewDb) {
|
static int32_t mndSetAlterDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb, SDbObj *pNewDb) {
|
||||||
int32_t code = 0;
|
int32_t code = 0, lino = 0;
|
||||||
SSdb *pSdb = pMnode->pSdb;
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
void *pIter = NULL;
|
void *pIter = NULL;
|
||||||
|
SVgObj *pVgroup = NULL;
|
||||||
SArray *pArray = mndBuildDnodesArray(pMnode, 0);
|
SArray *pArray = mndBuildDnodesArray(pMnode, 0);
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
SVgObj *pVgroup = NULL;
|
|
||||||
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
|
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
|
||||||
if (pIter == NULL) break;
|
if (pIter == NULL) break;
|
||||||
|
|
||||||
if (mndVgroupInDb(pVgroup, pNewDb->uid)) {
|
if (mndVgroupInDb(pVgroup, pNewDb->uid)) {
|
||||||
SVgObj newVgroup = {0};
|
SVgObj newVgroup = {0};
|
||||||
if ((code = mndBuildAlterVgroupAction(pMnode, pTrans, pOldDb, pNewDb, pVgroup, pArray, &newVgroup)) != 0) {
|
TAOS_CHECK_GOTO(mndBuildAlterVgroupAction(pMnode, pTrans, pOldDb, pNewDb, pVgroup, pArray, &newVgroup), &lino,
|
||||||
sdbCancelFetch(pSdb, pIter);
|
_err);
|
||||||
sdbRelease(pSdb, pVgroup);
|
|
||||||
taosArrayDestroy(pArray);
|
|
||||||
TAOS_RETURN(code);
|
|
||||||
}
|
|
||||||
if (pNewDb->cfg.withArbitrator != pOldDb->cfg.withArbitrator) {
|
if (pNewDb->cfg.withArbitrator != pOldDb->cfg.withArbitrator) {
|
||||||
if (pNewDb->cfg.withArbitrator) {
|
if (pNewDb->cfg.withArbitrator) {
|
||||||
SArbGroup arbGroup = {0};
|
SArbGroup arbGroup = {0};
|
||||||
mndArbGroupInitFromVgObj(&newVgroup, &arbGroup);
|
TAOS_CHECK_GOTO(mndArbGroupInitFromVgObj(&newVgroup, &arbGroup), &lino, _err);
|
||||||
if ((code = mndSetCreateArbGroupCommitLogs(pTrans, &arbGroup)) != 0) {
|
TAOS_CHECK_GOTO(mndSetCreateArbGroupCommitLogs(pTrans, &arbGroup), &lino, _err);
|
||||||
sdbCancelFetch(pSdb, pIter);
|
|
||||||
sdbRelease(pSdb, pVgroup);
|
|
||||||
taosArrayDestroy(pArray);
|
|
||||||
TAOS_RETURN(code);
|
|
||||||
}
|
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
SArbGroup arbGroup = {0};
|
SArbGroup arbGroup = {0};
|
||||||
mndArbGroupInitFromVgObj(pVgroup, &arbGroup);
|
TAOS_CHECK_GOTO(mndArbGroupInitFromVgObj(pVgroup, &arbGroup), &lino, _err);
|
||||||
if ((code = mndSetDropArbGroupCommitLogs(pTrans, &arbGroup)) != 0) {
|
TAOS_CHECK_GOTO(mndSetDropArbGroupCommitLogs(pTrans, &arbGroup), &lino, _err);
|
||||||
sdbCancelFetch(pSdb, pIter);
|
|
||||||
sdbRelease(pSdb, pVgroup);
|
|
||||||
taosArrayDestroy(pArray);
|
|
||||||
TAOS_RETURN(code);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1203,6 +1189,14 @@ static int32_t mndSetAlterDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *
|
||||||
|
|
||||||
taosArrayDestroy(pArray);
|
taosArrayDestroy(pArray);
|
||||||
TAOS_RETURN(code);
|
TAOS_RETURN(code);
|
||||||
|
|
||||||
|
_err:
|
||||||
|
mError("db:%s, %s failed at %d since %s", pNewDb->name, __func__, lino, tstrerror(code));
|
||||||
|
|
||||||
|
sdbCancelFetch(pSdb, pIter);
|
||||||
|
sdbRelease(pSdb, pVgroup);
|
||||||
|
taosArrayDestroy(pArray);
|
||||||
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndAlterDb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pOld, SDbObj *pNew) {
|
static int32_t mndAlterDb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pOld, SDbObj *pNew) {
|
||||||
|
|
|
@ -126,7 +126,7 @@ uint64_t tableListGetTableGroupId(const STableListInfo* pTableList, uint6
|
||||||
int32_t tableListAddTableInfo(STableListInfo* pTableList, uint64_t uid, uint64_t gid);
|
int32_t tableListAddTableInfo(STableListInfo* pTableList, uint64_t uid, uint64_t gid);
|
||||||
int32_t tableListGetGroupList(const STableListInfo* pTableList, int32_t ordinalIndex, STableKeyInfo** pKeyInfo,
|
int32_t tableListGetGroupList(const STableListInfo* pTableList, int32_t ordinalIndex, STableKeyInfo** pKeyInfo,
|
||||||
int32_t* num);
|
int32_t* num);
|
||||||
uint64_t tableListGetSize(const STableListInfo* pTableList);
|
int32_t tableListGetSize(const STableListInfo* pTableList, int32_t* pRes);
|
||||||
uint64_t tableListGetSuid(const STableListInfo* pTableList);
|
uint64_t tableListGetSuid(const STableListInfo* pTableList);
|
||||||
STableKeyInfo* tableListGetInfo(const STableListInfo* pTableList, int32_t index);
|
STableKeyInfo* tableListGetInfo(const STableListInfo* pTableList, int32_t index);
|
||||||
int32_t tableListFind(const STableListInfo* pTableList, uint64_t uid, int32_t startIndex);
|
int32_t tableListFind(const STableListInfo* pTableList, uint64_t uid, int32_t startIndex);
|
||||||
|
|
|
@ -176,7 +176,10 @@ int32_t createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SReadHandl
|
||||||
code = extractCacheScanSlotId(pInfo->matchInfo.pList, pTaskInfo, &pInfo->pSlotIds, &pInfo->pDstSlotIds);
|
code = extractCacheScanSlotId(pInfo->matchInfo.pList, pTaskInfo, &pInfo->pSlotIds, &pInfo->pDstSlotIds);
|
||||||
QUERY_CHECK_CODE(code, lino, _error);
|
QUERY_CHECK_CODE(code, lino, _error);
|
||||||
|
|
||||||
int32_t totalTables = tableListGetSize(pTableListInfo);
|
int32_t totalTables = 0;
|
||||||
|
code = tableListGetSize(pTableListInfo, &totalTables);
|
||||||
|
QUERY_CHECK_CODE(code, lino, _error);
|
||||||
|
|
||||||
int32_t capacity = 0;
|
int32_t capacity = 0;
|
||||||
|
|
||||||
pInfo->pUidList = taosArrayInit(4, sizeof(int64_t));
|
pInfo->pUidList = taosArrayInit(4, sizeof(int64_t));
|
||||||
|
@ -271,7 +274,10 @@ int32_t doScanCacheNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
|
||||||
SSDataBlock* pBufRes = pInfo->pBufferedRes;
|
SSDataBlock* pBufRes = pInfo->pBufferedRes;
|
||||||
|
|
||||||
uint64_t suid = tableListGetSuid(pTableList);
|
uint64_t suid = tableListGetSuid(pTableList);
|
||||||
int32_t size = tableListGetSize(pTableList);
|
int32_t size = 0;
|
||||||
|
code = tableListGetSize(pTableList, &size);
|
||||||
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
|
|
||||||
if (size == 0) {
|
if (size == 0) {
|
||||||
setOperatorCompleted(pOperator);
|
setOperatorCompleted(pOperator);
|
||||||
(*ppRes) = NULL;
|
(*ppRes) = NULL;
|
||||||
|
|
|
@ -894,12 +894,12 @@ int32_t doExtractResultBlocks(SExchangeInfo* pExchangeInfo, SSourceDataInfo* pDa
|
||||||
|
|
||||||
int32_t rawLen = *(int32_t*)pStart;
|
int32_t rawLen = *(int32_t*)pStart;
|
||||||
pStart += sizeof(int32_t);
|
pStart += sizeof(int32_t);
|
||||||
ASSERT(compLen <= rawLen && compLen != 0);
|
QUERY_CHECK_CONDITION((compLen <= rawLen && compLen != 0), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
|
||||||
|
|
||||||
pNextStart = pStart + compLen;
|
pNextStart = pStart + compLen;
|
||||||
if (pRetrieveRsp->compressed && (compLen < rawLen)) {
|
if (pRetrieveRsp->compressed && (compLen < rawLen)) {
|
||||||
int32_t t = tsDecompressString(pStart, compLen, 1, pDataInfo->decompBuf, rawLen, ONE_STAGE_COMP, NULL, 0);
|
int32_t t = tsDecompressString(pStart, compLen, 1, pDataInfo->decompBuf, rawLen, ONE_STAGE_COMP, NULL, 0);
|
||||||
ASSERT(t == rawLen);
|
QUERY_CHECK_CONDITION((t == rawLen), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
|
||||||
pStart = pDataInfo->decompBuf;
|
pStart = pDataInfo->decompBuf;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -197,7 +197,6 @@ void initMultiResInfoFromArrayList(SGroupResInfo* pGroupResInfo, SArray* pArrayL
|
||||||
pGroupResInfo->freeItem = true;
|
pGroupResInfo->freeItem = true;
|
||||||
pGroupResInfo->pRows = pArrayList;
|
pGroupResInfo->pRows = pArrayList;
|
||||||
pGroupResInfo->index = 0;
|
pGroupResInfo->index = 0;
|
||||||
ASSERT(pGroupResInfo->index <= getNumOfTotalRes(pGroupResInfo));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
bool hasRemainResults(SGroupResInfo* pGroupResInfo) {
|
bool hasRemainResults(SGroupResInfo* pGroupResInfo) {
|
||||||
|
@ -1560,7 +1559,12 @@ int32_t getGroupIdFromTagsVal(void* pVnode, uint64_t uid, SNodeList* pGroupNode,
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
ASSERT(nodeType(pNew) == QUERY_NODE_VALUE);
|
if (nodeType(pNew) != QUERY_NODE_VALUE) {
|
||||||
|
nodesDestroyList(groupNew);
|
||||||
|
pAPI->metaReaderFn.clearReader(&mr);
|
||||||
|
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR));
|
||||||
|
return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
|
||||||
|
}
|
||||||
SValueNode* pValue = (SValueNode*)pNew;
|
SValueNode* pValue = (SValueNode*)pNew;
|
||||||
|
|
||||||
if (pValue->node.resType.type == TSDB_DATA_TYPE_NULL || pValue->isNull) {
|
if (pValue->node.resType.type == TSDB_DATA_TYPE_NULL || pValue->isNull) {
|
||||||
|
@ -1879,7 +1883,8 @@ int32_t createExprFromOneNode(SExprInfo* pExp, SNode* pNode, int16_t slotId) {
|
||||||
pExp->base.resSchema = createResSchema(pType->type, pType->bytes, slotId, pType->scale, pType->precision, pCond->node.aliasName);
|
pExp->base.resSchema = createResSchema(pType->type, pType->bytes, slotId, pType->scale, pType->precision, pCond->node.aliasName);
|
||||||
pExp->pExpr->_optrRoot.pRootNode = pNode;
|
pExp->pExpr->_optrRoot.pRootNode = pNode;
|
||||||
} else {
|
} else {
|
||||||
ASSERT(0);
|
code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
|
||||||
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
}
|
}
|
||||||
|
|
||||||
_end:
|
_end:
|
||||||
|
@ -2149,7 +2154,8 @@ int32_t relocateColumnData(SSDataBlock* pBlock, const SArray* pColMatchInfo, SAr
|
||||||
} else if (p->info.colId < pmInfo->colId) {
|
} else if (p->info.colId < pmInfo->colId) {
|
||||||
i++;
|
i++;
|
||||||
} else {
|
} else {
|
||||||
ASSERT(0);
|
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR));
|
||||||
|
return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return code;
|
return code;
|
||||||
|
@ -2383,9 +2389,13 @@ void resetLimitInfoForNextGroup(SLimitInfo* pLimitInfo) {
|
||||||
pLimitInfo->remainOffset = pLimitInfo->limit.offset;
|
pLimitInfo->remainOffset = pLimitInfo->limit.offset;
|
||||||
}
|
}
|
||||||
|
|
||||||
uint64_t tableListGetSize(const STableListInfo* pTableList) {
|
int32_t tableListGetSize(const STableListInfo* pTableList, int32_t* pRes) {
|
||||||
ASSERT(taosArrayGetSize(pTableList->pTableList) == taosHashGetSize(pTableList->map));
|
if (taosArrayGetSize(pTableList->pTableList) != taosHashGetSize(pTableList->map)) {
|
||||||
return taosArrayGetSize(pTableList->pTableList);
|
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR));
|
||||||
|
return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
|
||||||
|
}
|
||||||
|
(*pRes) = taosArrayGetSize(pTableList->pTableList);
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
uint64_t tableListGetSuid(const STableListInfo* pTableList) { return pTableList->idInfo.suid; }
|
uint64_t tableListGetSuid(const STableListInfo* pTableList) { return pTableList->idInfo.suid; }
|
||||||
|
@ -2430,7 +2440,6 @@ uint64_t tableListGetTableGroupId(const STableListInfo* pTableList, uint64_t tab
|
||||||
}
|
}
|
||||||
|
|
||||||
STableKeyInfo* pKeyInfo = taosArrayGet(pTableList->pTableList, *slot);
|
STableKeyInfo* pKeyInfo = taosArrayGet(pTableList->pTableList, *slot);
|
||||||
ASSERT(pKeyInfo->uid == tableUid);
|
|
||||||
|
|
||||||
return pKeyInfo->groupId;
|
return pKeyInfo->groupId;
|
||||||
}
|
}
|
||||||
|
@ -2457,7 +2466,8 @@ int32_t tableListAddTableInfo(STableListInfo* pTableList, uint64_t uid, uint64_t
|
||||||
int32_t slot = (int32_t)taosArrayGetSize(pTableList->pTableList) - 1;
|
int32_t slot = (int32_t)taosArrayGetSize(pTableList->pTableList) - 1;
|
||||||
code = taosHashPut(pTableList->map, &uid, sizeof(uid), &slot, sizeof(slot));
|
code = taosHashPut(pTableList->map, &uid, sizeof(uid), &slot, sizeof(slot));
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
ASSERT(code != TSDB_CODE_DUP_KEY); // we have checked the existence of uid in hash map above
|
// we have checked the existence of uid in hash map above
|
||||||
|
QUERY_CHECK_CONDITION((code != TSDB_CODE_DUP_KEY), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
|
||||||
taosArrayPopTailBatch(pTableList->pTableList, 1); // let's pop the last element in the array list
|
taosArrayPopTailBatch(pTableList->pTableList, 1); // let's pop the last element in the array list
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2474,7 +2484,12 @@ _end:
|
||||||
int32_t tableListGetGroupList(const STableListInfo* pTableList, int32_t ordinalGroupIndex, STableKeyInfo** pKeyInfo,
|
int32_t tableListGetGroupList(const STableListInfo* pTableList, int32_t ordinalGroupIndex, STableKeyInfo** pKeyInfo,
|
||||||
int32_t* size) {
|
int32_t* size) {
|
||||||
int32_t totalGroups = tableListGetOutputGroups(pTableList);
|
int32_t totalGroups = tableListGetOutputGroups(pTableList);
|
||||||
int32_t numOfTables = tableListGetSize(pTableList);
|
int32_t numOfTables = 0;
|
||||||
|
int32_t code = tableListGetSize(pTableList, &numOfTables);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
if (ordinalGroupIndex < 0 || ordinalGroupIndex >= totalGroups) {
|
if (ordinalGroupIndex < 0 || ordinalGroupIndex >= totalGroups) {
|
||||||
return TSDB_CODE_INVALID_PARA;
|
return TSDB_CODE_INVALID_PARA;
|
||||||
|
|
|
@ -1295,7 +1295,13 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
|
||||||
|
|
||||||
// this value may be changed if new tables are created
|
// this value may be changed if new tables are created
|
||||||
taosRLockLatch(&pTaskInfo->lock);
|
taosRLockLatch(&pTaskInfo->lock);
|
||||||
int32_t numOfTables = tableListGetSize(pTableListInfo);
|
int32_t numOfTables = 0;
|
||||||
|
code = tableListGetSize(pTableListInfo, &numOfTables);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
|
||||||
|
taosRUnLockLatch(&pTaskInfo->lock);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
if (uid == 0) {
|
if (uid == 0) {
|
||||||
if (numOfTables != 0) {
|
if (numOfTables != 0) {
|
||||||
|
@ -1439,7 +1445,13 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
|
||||||
tDeleteSchemaWrapper(mtInfo.schema);
|
tDeleteSchemaWrapper(mtInfo.schema);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
int32_t size = tableListGetSize(pTableListInfo);
|
int32_t size = 0;
|
||||||
|
code = tableListGetSize(pTableListInfo, &size);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
|
||||||
|
tDeleteSchemaWrapper(mtInfo.schema);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
code = pTaskInfo->storageAPI.tsdReader.tsdReaderOpen(pInfo->vnode, &pTaskInfo->streamInfo.tableCond, pList, size,
|
code = pTaskInfo->storageAPI.tsdReader.tsdReaderOpen(pInfo->vnode, &pTaskInfo->streamInfo.tableCond, pList, size,
|
||||||
NULL, (void**)&pInfo->dataReader, NULL, NULL);
|
NULL, (void**)&pInfo->dataReader, NULL, NULL);
|
||||||
|
@ -1520,7 +1532,10 @@ SArray* qGetQueriedTableListInfo(qTaskInfo_t tinfo) {
|
||||||
SArray* pUidList = taosArrayInit(10, sizeof(uint64_t));
|
SArray* pUidList = taosArrayInit(10, sizeof(uint64_t));
|
||||||
QUERY_CHECK_NULL(pUidList, code, lino, _end, terrno);
|
QUERY_CHECK_NULL(pUidList, code, lino, _end, terrno);
|
||||||
|
|
||||||
int32_t numOfTables = tableListGetSize(pTableListInfo);
|
int32_t numOfTables = 0;
|
||||||
|
code = tableListGetSize(pTableListInfo, &numOfTables);
|
||||||
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
|
|
||||||
for (int32_t i = 0; i < numOfTables; ++i) {
|
for (int32_t i = 0; i < numOfTables; ++i) {
|
||||||
STableKeyInfo* pKeyInfo = tableListGetInfo(pTableListInfo, i);
|
STableKeyInfo* pKeyInfo = tableListGetInfo(pTableListInfo, i);
|
||||||
QUERY_CHECK_NULL(pKeyInfo, code, lino, _end, terrno);
|
QUERY_CHECK_NULL(pKeyInfo, code, lino, _end, terrno);
|
||||||
|
|
|
@ -1115,7 +1115,14 @@ int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, SExecTaskInfo*
|
||||||
pDeleterParam->suid = tableListGetSuid(pTableListInfo);
|
pDeleterParam->suid = tableListGetSuid(pTableListInfo);
|
||||||
|
|
||||||
// TODO extract uid list
|
// TODO extract uid list
|
||||||
int32_t numOfTables = tableListGetSize(pTableListInfo);
|
int32_t numOfTables = 0;
|
||||||
|
code = tableListGetSize(pTableListInfo, &numOfTables);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
|
||||||
|
taosMemoryFree(pDeleterParam);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
pDeleterParam->pUidList = taosArrayInit(numOfTables, sizeof(uint64_t));
|
pDeleterParam->pUidList = taosArrayInit(numOfTables, sizeof(uint64_t));
|
||||||
if (NULL == pDeleterParam->pUidList) {
|
if (NULL == pDeleterParam->pUidList) {
|
||||||
taosMemoryFree(pDeleterParam);
|
taosMemoryFree(pDeleterParam);
|
||||||
|
|
|
@ -213,7 +213,6 @@ static void recordNewGroupKeys(SArray* pGroupCols, SArray* pGroupColVals, SSData
|
||||||
memcpy(pkey->pData, val, dataLen);
|
memcpy(pkey->pData, val, dataLen);
|
||||||
} else if (IS_VAR_DATA_TYPE(pkey->type)) {
|
} else if (IS_VAR_DATA_TYPE(pkey->type)) {
|
||||||
memcpy(pkey->pData, val, varDataTLen(val));
|
memcpy(pkey->pData, val, varDataTLen(val));
|
||||||
ASSERT(varDataTLen(val) <= pkey->bytes);
|
|
||||||
} else {
|
} else {
|
||||||
memcpy(pkey->pData, val, pkey->bytes);
|
memcpy(pkey->pData, val, pkey->bytes);
|
||||||
}
|
}
|
||||||
|
@ -241,7 +240,6 @@ static int32_t buildGroupKeys(void* pKey, const SArray* pGroupColVals) {
|
||||||
} else if (IS_VAR_DATA_TYPE(pkey->type)) {
|
} else if (IS_VAR_DATA_TYPE(pkey->type)) {
|
||||||
varDataCopy(pStart, pkey->pData);
|
varDataCopy(pStart, pkey->pData);
|
||||||
pStart += varDataTLen(pkey->pData);
|
pStart += varDataTLen(pkey->pData);
|
||||||
ASSERT(varDataTLen(pkey->pData) <= pkey->bytes);
|
|
||||||
} else {
|
} else {
|
||||||
memcpy(pStart, pkey->pData, pkey->bytes);
|
memcpy(pStart, pkey->pData, pkey->bytes);
|
||||||
pStart += pkey->bytes;
|
pStart += pkey->bytes;
|
||||||
|
@ -740,7 +738,7 @@ static void doHashPartition(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
|
||||||
|
|
||||||
memcpy(data + (*columnLen), src, dataLen);
|
memcpy(data + (*columnLen), src, dataLen);
|
||||||
int32_t v = (data + (*columnLen) + dataLen - (char*)pPage);
|
int32_t v = (data + (*columnLen) + dataLen - (char*)pPage);
|
||||||
ASSERT(v > 0);
|
QUERY_CHECK_CONDITION((v > 0), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
|
||||||
|
|
||||||
contentLen = dataLen;
|
contentLen = dataLen;
|
||||||
} else {
|
} else {
|
||||||
|
@ -748,7 +746,7 @@ static void doHashPartition(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
|
||||||
char* src = colDataGetData(pColInfoData, j);
|
char* src = colDataGetData(pColInfoData, j);
|
||||||
memcpy(data + (*columnLen), src, varDataTLen(src));
|
memcpy(data + (*columnLen), src, varDataTLen(src));
|
||||||
int32_t v = (data + (*columnLen) + varDataTLen(src) - (char*)pPage);
|
int32_t v = (data + (*columnLen) + varDataTLen(src) - (char*)pPage);
|
||||||
ASSERT(v > 0);
|
QUERY_CHECK_CONDITION((v > 0), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
|
||||||
|
|
||||||
contentLen = varDataTLen(src);
|
contentLen = varDataTLen(src);
|
||||||
}
|
}
|
||||||
|
@ -762,7 +760,8 @@ static void doHashPartition(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
|
||||||
colDataSetNull_f(bitmap, (*rows));
|
colDataSetNull_f(bitmap, (*rows));
|
||||||
} else {
|
} else {
|
||||||
memcpy(data + (*columnLen), colDataGetData(pColInfoData, j), bytes);
|
memcpy(data + (*columnLen), colDataGetData(pColInfoData, j), bytes);
|
||||||
ASSERT((data + (*columnLen) + bytes - (char*)pPage) <= getBufPageSize(pInfo->pBuf));
|
QUERY_CHECK_CONDITION(((data + (*columnLen) + bytes - (char*)pPage) <= getBufPageSize(pInfo->pBuf)), code,
|
||||||
|
lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
|
||||||
}
|
}
|
||||||
contentLen = bytes;
|
contentLen = bytes;
|
||||||
}
|
}
|
||||||
|
@ -1299,7 +1298,7 @@ static SSDataBlock* buildStreamPartitionResult(SOperatorInfo* pOperator) {
|
||||||
|
|
||||||
SStreamPartitionOperatorInfo* pInfo = pOperator->info;
|
SStreamPartitionOperatorInfo* pInfo = pOperator->info;
|
||||||
SSDataBlock* pDest = pInfo->binfo.pRes;
|
SSDataBlock* pDest = pInfo->binfo.pRes;
|
||||||
ASSERT(hasRemainPartion(pInfo));
|
QUERY_CHECK_CONDITION((hasRemainPartion(pInfo)), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
|
||||||
SPartitionDataInfo* pParInfo = (SPartitionDataInfo*)pInfo->parIte;
|
SPartitionDataInfo* pParInfo = (SPartitionDataInfo*)pInfo->parIte;
|
||||||
blockDataCleanup(pDest);
|
blockDataCleanup(pDest);
|
||||||
int32_t rows = taosArrayGetSize(pParInfo->rowIds);
|
int32_t rows = taosArrayGetSize(pParInfo->rowIds);
|
||||||
|
@ -1343,7 +1342,7 @@ static SSDataBlock* buildStreamPartitionResult(SOperatorInfo* pOperator) {
|
||||||
pDest->info.id.groupId = pParInfo->groupId;
|
pDest->info.id.groupId = pParInfo->groupId;
|
||||||
pOperator->resultInfo.totalRows += pDest->info.rows;
|
pOperator->resultInfo.totalRows += pDest->info.rows;
|
||||||
pInfo->parIte = taosHashIterate(pInfo->pPartitions, pInfo->parIte);
|
pInfo->parIte = taosHashIterate(pInfo->pPartitions, pInfo->parIte);
|
||||||
ASSERT(pDest->info.rows > 0);
|
QUERY_CHECK_CONDITION((pDest->info.rows > 0), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
|
||||||
|
|
||||||
_end:
|
_end:
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
@ -1549,7 +1548,8 @@ static int32_t doStreamHashPartitionNext(SOperatorInfo* pOperator, SSDataBlock**
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
default:
|
default:
|
||||||
ASSERTS(0, "invalid SSDataBlock type");
|
code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
|
||||||
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
}
|
}
|
||||||
|
|
||||||
// there is an scalar expression that needs to be calculated right before apply the group aggregation.
|
// there is an scalar expression that needs to be calculated right before apply the group aggregation.
|
||||||
|
|
|
@ -1083,7 +1083,10 @@ static SSDataBlock* doGroupedTableScan(SOperatorInfo* pOperator) {
|
||||||
return getBlockForEmptyTable(pOperator, pStart);
|
return getBlockForEmptyTable(pOperator, pStart);
|
||||||
}
|
}
|
||||||
} else { // group by tag + no sort
|
} else { // group by tag + no sort
|
||||||
int32_t numOfTables = tableListGetSize(pTableListInfo);
|
int32_t numOfTables = 0;
|
||||||
|
code = tableListGetSize(pTableListInfo, &numOfTables);
|
||||||
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
|
|
||||||
if (pTableScanInfo->tableEndIndex + 1 >= numOfTables) {
|
if (pTableScanInfo->tableEndIndex + 1 >= numOfTables) {
|
||||||
// get empty group, mark processed & rm from hash
|
// get empty group, mark processed & rm from hash
|
||||||
void* pIte = taosHashIterate(pTableListInfo->remainGroups, NULL);
|
void* pIte = taosHashIterate(pTableListInfo->remainGroups, NULL);
|
||||||
|
@ -1172,7 +1175,10 @@ static SSDataBlock* startNextGroupScan(SOperatorInfo* pOperator) {
|
||||||
STableScanInfo* pInfo = pOperator->info;
|
STableScanInfo* pInfo = pOperator->info;
|
||||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
SStorageAPI* pAPI = &pTaskInfo->storageAPI;
|
SStorageAPI* pAPI = &pTaskInfo->storageAPI;
|
||||||
int32_t numOfTables = tableListGetSize(pInfo->base.pTableListInfo);
|
int32_t numOfTables = 0;
|
||||||
|
code = tableListGetSize(pInfo->base.pTableListInfo, &numOfTables);
|
||||||
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
|
|
||||||
if ((++pInfo->currentGroupId) >= tableListGetOutputGroups(pInfo->base.pTableListInfo)) {
|
if ((++pInfo->currentGroupId) >= tableListGetOutputGroups(pInfo->base.pTableListInfo)) {
|
||||||
setOperatorCompleted(pOperator);
|
setOperatorCompleted(pOperator);
|
||||||
if (pOperator->dynamicTask) {
|
if (pOperator->dynamicTask) {
|
||||||
|
@ -1305,7 +1311,7 @@ static int32_t doTableScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
|
||||||
|
|
||||||
// scan table one by one sequentially
|
// scan table one by one sequentially
|
||||||
if (pInfo->scanMode == TABLE_SCAN__TABLE_ORDER) {
|
if (pInfo->scanMode == TABLE_SCAN__TABLE_ORDER) {
|
||||||
int32_t numOfTables = 0; // tableListGetSize(pTaskInfo->pTableListInfo);
|
int32_t numOfTables = 0;
|
||||||
STableKeyInfo tInfo = {0};
|
STableKeyInfo tInfo = {0};
|
||||||
pInfo->countState = TABLE_COUNT_STATE_END;
|
pInfo->countState = TABLE_COUNT_STATE_END;
|
||||||
|
|
||||||
|
@ -1320,7 +1326,13 @@ static int32_t doTableScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
|
||||||
pInfo->currentTable++;
|
pInfo->currentTable++;
|
||||||
|
|
||||||
taosRLockLatch(&pTaskInfo->lock);
|
taosRLockLatch(&pTaskInfo->lock);
|
||||||
numOfTables = tableListGetSize(pInfo->base.pTableListInfo);
|
numOfTables = 0;
|
||||||
|
code = tableListGetSize(pInfo->base.pTableListInfo, &numOfTables);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
taosRUnLockLatch(&pTaskInfo->lock);
|
||||||
|
lino = __LINE__;
|
||||||
|
goto _end;
|
||||||
|
}
|
||||||
|
|
||||||
if (pInfo->currentTable >= numOfTables) {
|
if (pInfo->currentTable >= numOfTables) {
|
||||||
qDebug("all table checked in table list, total:%d, return NULL, %s", numOfTables, GET_TASKID(pTaskInfo));
|
qDebug("all table checked in table list, total:%d, return NULL, %s", numOfTables, GET_TASKID(pTaskInfo));
|
||||||
|
@ -3613,7 +3625,9 @@ static int32_t extractTableIdList(const STableListInfo* pTableListInfo, SArray**
|
||||||
QUERY_CHECK_NULL(tableIdList, code, lino, _end, terrno);
|
QUERY_CHECK_NULL(tableIdList, code, lino, _end, terrno);
|
||||||
|
|
||||||
// Transfer the Array of STableKeyInfo into uid list.
|
// Transfer the Array of STableKeyInfo into uid list.
|
||||||
size_t size = tableListGetSize(pTableListInfo);
|
int32_t size = 0;
|
||||||
|
code = tableListGetSize(pTableListInfo, &size);
|
||||||
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
for (int32_t i = 0; i < size; ++i) {
|
for (int32_t i = 0; i < size; ++i) {
|
||||||
STableKeyInfo* pkeyInfo = tableListGetInfo(pTableListInfo, i);
|
STableKeyInfo* pkeyInfo = tableListGetInfo(pTableListInfo, i);
|
||||||
QUERY_CHECK_NULL(pkeyInfo, code, lino, _end, terrno);
|
QUERY_CHECK_NULL(pkeyInfo, code, lino, _end, terrno);
|
||||||
|
@ -4629,7 +4643,13 @@ static int32_t doTagScanFromMetaEntryNext(SOperatorInfo* pOperator, SSDataBlock*
|
||||||
SSDataBlock* pRes = pInfo->pRes;
|
SSDataBlock* pRes = pInfo->pRes;
|
||||||
blockDataCleanup(pRes);
|
blockDataCleanup(pRes);
|
||||||
|
|
||||||
int32_t size = tableListGetSize(pInfo->pTableListInfo);
|
int32_t size = 0;
|
||||||
|
code = tableListGetSize(pInfo->pTableListInfo, &size);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
if (size == 0) {
|
if (size == 0) {
|
||||||
setTaskStatus(pTaskInfo, TASK_COMPLETED);
|
setTaskStatus(pTaskInfo, TASK_COMPLETED);
|
||||||
(*ppRes) = NULL;
|
(*ppRes) = NULL;
|
||||||
|
@ -4927,7 +4947,13 @@ static int32_t fetchNextSubTableBlockFromReader(SOperatorInfo* pOperator, STmsSu
|
||||||
static int32_t setGroupStartEndIndex(STableMergeScanInfo* pInfo) {
|
static int32_t setGroupStartEndIndex(STableMergeScanInfo* pInfo) {
|
||||||
pInfo->bGroupProcessed = false;
|
pInfo->bGroupProcessed = false;
|
||||||
|
|
||||||
size_t numOfTables = tableListGetSize(pInfo->base.pTableListInfo);
|
int32_t numOfTables = 0;
|
||||||
|
int32_t code = tableListGetSize(pInfo->base.pTableListInfo, &numOfTables);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t i = pInfo->tableStartIndex + 1;
|
int32_t i = pInfo->tableStartIndex + 1;
|
||||||
for (; i < numOfTables; ++i) {
|
for (; i < numOfTables; ++i) {
|
||||||
STableKeyInfo* tableKeyInfo = tableListGetInfo(pInfo->base.pTableListInfo, i);
|
STableKeyInfo* tableKeyInfo = tableListGetInfo(pInfo->base.pTableListInfo, i);
|
||||||
|
@ -5266,7 +5292,10 @@ int32_t doTableMergeScanParaSubTablesNext(SOperatorInfo* pOperator, SSDataBlock*
|
||||||
|
|
||||||
int64_t st = taosGetTimestampUs();
|
int64_t st = taosGetTimestampUs();
|
||||||
|
|
||||||
size_t tableListSize = tableListGetSize(pInfo->base.pTableListInfo);
|
int32_t tableListSize = 0;
|
||||||
|
code = tableListGetSize(pInfo->base.pTableListInfo, &tableListSize);
|
||||||
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
|
|
||||||
if (!pInfo->hasGroupId) {
|
if (!pInfo->hasGroupId) {
|
||||||
pInfo->hasGroupId = true;
|
pInfo->hasGroupId = true;
|
||||||
|
|
||||||
|
@ -5643,7 +5672,10 @@ void startGroupTableMergeScan(SOperatorInfo* pOperator) {
|
||||||
qDebug("%s table merge scan start group %" PRIu64, GET_TASKID(pTaskInfo), pInfo->groupId);
|
qDebug("%s table merge scan start group %" PRIu64, GET_TASKID(pTaskInfo), pInfo->groupId);
|
||||||
|
|
||||||
{
|
{
|
||||||
size_t numOfTables = tableListGetSize(pInfo->base.pTableListInfo);
|
int32_t numOfTables = 0;
|
||||||
|
code = tableListGetSize(pInfo->base.pTableListInfo, &numOfTables);
|
||||||
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
|
|
||||||
int32_t i = pInfo->tableStartIndex + 1;
|
int32_t i = pInfo->tableStartIndex + 1;
|
||||||
for (; i < numOfTables; ++i) {
|
for (; i < numOfTables; ++i) {
|
||||||
STableKeyInfo* tableKeyInfo = tableListGetInfo(pInfo->base.pTableListInfo, i);
|
STableKeyInfo* tableKeyInfo = tableListGetInfo(pInfo->base.pTableListInfo, i);
|
||||||
|
@ -5763,7 +5795,10 @@ int32_t doTableMergeScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
|
||||||
|
|
||||||
int64_t st = taosGetTimestampUs();
|
int64_t st = taosGetTimestampUs();
|
||||||
|
|
||||||
size_t tableListSize = tableListGetSize(pInfo->base.pTableListInfo);
|
int32_t tableListSize = 0;
|
||||||
|
code = tableListGetSize(pInfo->base.pTableListInfo, &tableListSize);
|
||||||
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
|
|
||||||
if (!pInfo->hasGroupId) {
|
if (!pInfo->hasGroupId) {
|
||||||
pInfo->hasGroupId = true;
|
pInfo->hasGroupId = true;
|
||||||
|
|
||||||
|
|
|
@ -2843,7 +2843,10 @@ int32_t createDataBlockInfoScanOperator(SReadHandle* readHandle, SBlockDistScanP
|
||||||
QUERY_CHECK_CODE(code, lino, _error);
|
QUERY_CHECK_CODE(code, lino, _error);
|
||||||
|
|
||||||
pInfo->pTableListInfo = pTableListInfo;
|
pInfo->pTableListInfo = pTableListInfo;
|
||||||
size_t num = tableListGetSize(pTableListInfo);
|
int32_t num = 0;
|
||||||
|
code = tableListGetSize(pTableListInfo, &num);
|
||||||
|
QUERY_CHECK_CODE(code, lino, _error);
|
||||||
|
|
||||||
void* pList = tableListGetInfo(pTableListInfo, 0);
|
void* pList = tableListGetInfo(pTableListInfo, 0);
|
||||||
|
|
||||||
code = readHandle->api.tsdReader.tsdReaderOpen(readHandle->vnode, &cond, pList, num, pInfo->pResBlock,
|
code = readHandle->api.tsdReader.tsdReaderOpen(readHandle->vnode, &cond, pList, num, pInfo->pResBlock,
|
||||||
|
|
|
@ -324,8 +324,11 @@ _end:
|
||||||
|
|
||||||
static void saveColData(SArray* rowBuf, int32_t columnIndex, const char* src, bool isNull);
|
static void saveColData(SArray* rowBuf, int32_t columnIndex, const char* src, bool isNull);
|
||||||
|
|
||||||
static void copyCurrentRowIntoBuf(SFillInfo* pFillInfo, int32_t rowIndex, SRowVal* pRowVal, bool reset) {
|
static int32_t copyCurrentRowIntoBuf(SFillInfo* pFillInfo, int32_t rowIndex, SRowVal* pRowVal, bool reset) {
|
||||||
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
int32_t lino = 0;
|
||||||
SColumnInfoData* pTsCol = taosArrayGet(pFillInfo->pSrcBlock->pDataBlock, pFillInfo->srcTsSlotId);
|
SColumnInfoData* pTsCol = taosArrayGet(pFillInfo->pSrcBlock->pDataBlock, pFillInfo->srcTsSlotId);
|
||||||
|
QUERY_CHECK_NULL(pTsCol, code, lino, _end, terrno);
|
||||||
pRowVal->key = ((int64_t*)pTsCol->pData)[rowIndex];
|
pRowVal->key = ((int64_t*)pTsCol->pData)[rowIndex];
|
||||||
|
|
||||||
for (int32_t i = 0; i < pFillInfo->numOfCols; ++i) {
|
for (int32_t i = 0; i < pFillInfo->numOfCols; ++i) {
|
||||||
|
@ -342,15 +345,24 @@ static void copyCurrentRowIntoBuf(SFillInfo* pFillInfo, int32_t rowIndex, SRowVa
|
||||||
}
|
}
|
||||||
|
|
||||||
SColumnInfoData* pSrcCol = taosArrayGet(pFillInfo->pSrcBlock->pDataBlock, srcSlotId);
|
SColumnInfoData* pSrcCol = taosArrayGet(pFillInfo->pSrcBlock->pDataBlock, srcSlotId);
|
||||||
|
QUERY_CHECK_NULL(pSrcCol, code, lino, _end, terrno);
|
||||||
|
|
||||||
bool isNull = colDataIsNull_s(pSrcCol, rowIndex);
|
bool isNull = colDataIsNull_s(pSrcCol, rowIndex);
|
||||||
char* p = colDataGetData(pSrcCol, rowIndex);
|
char* p = colDataGetData(pSrcCol, rowIndex);
|
||||||
|
|
||||||
saveColData(pRowVal->pRowVal, i, p, reset ? true : isNull);
|
saveColData(pRowVal->pRowVal, i, p, reset ? true : isNull);
|
||||||
} else {
|
} else {
|
||||||
ASSERT(0);
|
code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
|
||||||
|
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
|
||||||
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
_end:
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||||
|
}
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t fillResultImpl(SFillInfo* pFillInfo, SSDataBlock* pBlock, int32_t outputRows) {
|
static int32_t fillResultImpl(SFillInfo* pFillInfo, SSDataBlock* pBlock, int32_t outputRows) {
|
||||||
|
@ -362,20 +374,18 @@ static int32_t fillResultImpl(SFillInfo* pFillInfo, SSDataBlock* pBlock, int32_t
|
||||||
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pFillInfo->order);
|
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pFillInfo->order);
|
||||||
bool ascFill = FILL_IS_ASC_FILL(pFillInfo);
|
bool ascFill = FILL_IS_ASC_FILL(pFillInfo);
|
||||||
|
|
||||||
#if 0
|
|
||||||
ASSERT(ascFill && (pFillInfo->currentKey >= pFillInfo->start) || (!ascFill && (pFillInfo->currentKey <= pFillInfo->start)));
|
|
||||||
#endif
|
|
||||||
|
|
||||||
while (pFillInfo->numOfCurrent < outputRows) {
|
while (pFillInfo->numOfCurrent < outputRows) {
|
||||||
int64_t ts = ((int64_t*)pTsCol->pData)[pFillInfo->index];
|
int64_t ts = ((int64_t*)pTsCol->pData)[pFillInfo->index];
|
||||||
|
|
||||||
// set the next value for interpolation
|
// set the next value for interpolation
|
||||||
if (pFillInfo->currentKey < ts && ascFill) {
|
if (pFillInfo->currentKey < ts && ascFill) {
|
||||||
SRowVal* pRVal = pFillInfo->type == TSDB_FILL_NEXT ? &pFillInfo->next : &pFillInfo->prev;
|
SRowVal* pRVal = pFillInfo->type == TSDB_FILL_NEXT ? &pFillInfo->next : &pFillInfo->prev;
|
||||||
copyCurrentRowIntoBuf(pFillInfo, pFillInfo->index, pRVal, false);
|
code = copyCurrentRowIntoBuf(pFillInfo, pFillInfo->index, pRVal, false);
|
||||||
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
} else if (pFillInfo->currentKey > ts && !ascFill) {
|
} else if (pFillInfo->currentKey > ts && !ascFill) {
|
||||||
SRowVal* pRVal = pFillInfo->type == TSDB_FILL_NEXT ? &pFillInfo->prev : &pFillInfo->next;
|
SRowVal* pRVal = pFillInfo->type == TSDB_FILL_NEXT ? &pFillInfo->prev : &pFillInfo->next;
|
||||||
copyCurrentRowIntoBuf(pFillInfo, pFillInfo->index, pRVal, false);
|
code = copyCurrentRowIntoBuf(pFillInfo, pFillInfo->index, pRVal, false);
|
||||||
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (((pFillInfo->currentKey < ts && ascFill) || (pFillInfo->currentKey > ts && !ascFill)) &&
|
if (((pFillInfo->currentKey < ts && ascFill) || (pFillInfo->currentKey > ts && !ascFill)) &&
|
||||||
|
@ -392,21 +402,24 @@ static int32_t fillResultImpl(SFillInfo* pFillInfo, SSDataBlock* pBlock, int32_t
|
||||||
goto _end;
|
goto _end;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
ASSERT(pFillInfo->currentKey == ts);
|
QUERY_CHECK_CONDITION((pFillInfo->currentKey == ts), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
|
||||||
int32_t index = pBlock->info.rows;
|
int32_t index = pBlock->info.rows;
|
||||||
|
|
||||||
int32_t nextRowIndex = pFillInfo->index + 1;
|
int32_t nextRowIndex = pFillInfo->index + 1;
|
||||||
if (pFillInfo->type == TSDB_FILL_NEXT) {
|
if (pFillInfo->type == TSDB_FILL_NEXT) {
|
||||||
if ((pFillInfo->index + 1) < pFillInfo->numOfRows) {
|
if ((pFillInfo->index + 1) < pFillInfo->numOfRows) {
|
||||||
copyCurrentRowIntoBuf(pFillInfo, nextRowIndex, &pFillInfo->next, false);
|
code = copyCurrentRowIntoBuf(pFillInfo, nextRowIndex, &pFillInfo->next, false);
|
||||||
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
} else {
|
} else {
|
||||||
// reset to null after last row
|
// reset to null after last row
|
||||||
copyCurrentRowIntoBuf(pFillInfo, nextRowIndex, &pFillInfo->next, true);
|
code = copyCurrentRowIntoBuf(pFillInfo, nextRowIndex, &pFillInfo->next, true);
|
||||||
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (pFillInfo->type == TSDB_FILL_PREV) {
|
if (pFillInfo->type == TSDB_FILL_PREV) {
|
||||||
if (nextRowIndex + 1 >= pFillInfo->numOfRows && !FILL_IS_ASC_FILL(pFillInfo)) {
|
if (nextRowIndex + 1 >= pFillInfo->numOfRows && !FILL_IS_ASC_FILL(pFillInfo)) {
|
||||||
copyCurrentRowIntoBuf(pFillInfo, nextRowIndex, &pFillInfo->next, true);
|
code = copyCurrentRowIntoBuf(pFillInfo, nextRowIndex, &pFillInfo->next, true);
|
||||||
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -500,7 +513,9 @@ static void saveColData(SArray* rowBuf, int32_t columnIndex, const char* src, bo
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static void appendFilledResult(SFillInfo* pFillInfo, SSDataBlock* pBlock, int64_t resultCapacity) {
|
static int32_t appendFilledResult(SFillInfo* pFillInfo, SSDataBlock* pBlock, int64_t resultCapacity) {
|
||||||
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
int32_t lino = 0;
|
||||||
/*
|
/*
|
||||||
* These data are generated according to fill strategy, since the current timestamp is out of the time window of
|
* These data are generated according to fill strategy, since the current timestamp is out of the time window of
|
||||||
* real result set. Note that we need to keep the direct previous result rows, to generated the filled data.
|
* real result set. Note that we need to keep the direct previous result rows, to generated the filled data.
|
||||||
|
@ -512,7 +527,14 @@ static void appendFilledResult(SFillInfo* pFillInfo, SSDataBlock* pBlock, int64_
|
||||||
|
|
||||||
pFillInfo->numOfTotal += pFillInfo->numOfCurrent;
|
pFillInfo->numOfTotal += pFillInfo->numOfCurrent;
|
||||||
|
|
||||||
ASSERT(pFillInfo->numOfCurrent == resultCapacity);
|
QUERY_CHECK_CONDITION((pFillInfo->numOfCurrent == resultCapacity), code, lino, _end,
|
||||||
|
TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
|
||||||
|
|
||||||
|
_end:
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||||
|
}
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t taosNumOfRemainRows(SFillInfo* pFillInfo) {
|
static int32_t taosNumOfRemainRows(SFillInfo* pFillInfo) {
|
||||||
|
@ -635,15 +657,6 @@ void taosFillSetStartInfo(SFillInfo* pFillInfo, int32_t numOfRows, TSKEY endKey)
|
||||||
|
|
||||||
// the endKey is now the aligned time window value. truncate time window isn't correct.
|
// the endKey is now the aligned time window value. truncate time window isn't correct.
|
||||||
pFillInfo->end = endKey;
|
pFillInfo->end = endKey;
|
||||||
|
|
||||||
#if 0
|
|
||||||
if (pFillInfo->order == TSDB_ORDER_ASC) {
|
|
||||||
ASSERT(pFillInfo->start <= pFillInfo->end);
|
|
||||||
} else {
|
|
||||||
ASSERT(pFillInfo->start >= pFillInfo->end);
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
||||||
pFillInfo->index = 0;
|
pFillInfo->index = 0;
|
||||||
pFillInfo->numOfRows = numOfRows;
|
pFillInfo->numOfRows = numOfRows;
|
||||||
}
|
}
|
||||||
|
@ -687,7 +700,6 @@ int64_t getNumOfResultsAfterFillGap(SFillInfo* pFillInfo, TSKEY ekey, int32_t ma
|
||||||
numOfRes =
|
numOfRes =
|
||||||
taosTimeCountIntervalForFill(lastKey, pFillInfo->currentKey, pFillInfo->interval.sliding,
|
taosTimeCountIntervalForFill(lastKey, pFillInfo->currentKey, pFillInfo->interval.sliding,
|
||||||
pFillInfo->interval.slidingUnit, pFillInfo->interval.precision, pFillInfo->order);
|
pFillInfo->interval.slidingUnit, pFillInfo->interval.precision, pFillInfo->order);
|
||||||
ASSERT(numOfRes >= numOfRows);
|
|
||||||
} else { // reach the end of data
|
} else { // reach the end of data
|
||||||
if ((ekey1 < pFillInfo->currentKey && FILL_IS_ASC_FILL(pFillInfo)) ||
|
if ((ekey1 < pFillInfo->currentKey && FILL_IS_ASC_FILL(pFillInfo)) ||
|
||||||
(ekey1 > pFillInfo->currentKey && !FILL_IS_ASC_FILL(pFillInfo))) {
|
(ekey1 > pFillInfo->currentKey && !FILL_IS_ASC_FILL(pFillInfo))) {
|
||||||
|
@ -719,23 +731,30 @@ void taosGetLinearInterpolationVal(SPoint* point, int32_t outputType, SPoint* po
|
||||||
|
|
||||||
int32_t taosFillResultDataBlock(SFillInfo* pFillInfo, SSDataBlock* p, int32_t capacity) {
|
int32_t taosFillResultDataBlock(SFillInfo* pFillInfo, SSDataBlock* p, int32_t capacity) {
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
int32_t lino = 0;
|
||||||
int32_t remain = taosNumOfRemainRows(pFillInfo);
|
int32_t remain = taosNumOfRemainRows(pFillInfo);
|
||||||
|
|
||||||
int64_t numOfRes = getNumOfResultsAfterFillGap(pFillInfo, pFillInfo->end, capacity);
|
int64_t numOfRes = getNumOfResultsAfterFillGap(pFillInfo, pFillInfo->end, capacity);
|
||||||
ASSERT(numOfRes <= capacity);
|
QUERY_CHECK_CONDITION((numOfRes <= capacity), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
|
||||||
|
|
||||||
// no data existed for fill operation now, append result according to the fill strategy
|
// no data existed for fill operation now, append result according to the fill strategy
|
||||||
if (remain == 0) {
|
if (remain == 0) {
|
||||||
appendFilledResult(pFillInfo, p, numOfRes);
|
code = appendFilledResult(pFillInfo, p, numOfRes);
|
||||||
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
} else {
|
} else {
|
||||||
code = fillResultImpl(pFillInfo, p, (int32_t)numOfRes);
|
code = fillResultImpl(pFillInfo, p, (int32_t)numOfRes);
|
||||||
ASSERT(numOfRes == pFillInfo->numOfCurrent);
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
|
QUERY_CHECK_CONDITION((numOfRes == pFillInfo->numOfCurrent), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
|
||||||
}
|
}
|
||||||
|
|
||||||
qDebug("fill:%p, generated fill result, src block:%d, index:%d, brange:%" PRId64 "-%" PRId64 ", currentKey:%" PRId64
|
qDebug("fill:%p, generated fill result, src block:%d, index:%d, brange:%" PRId64 "-%" PRId64 ", currentKey:%" PRId64
|
||||||
", current : % d, total : % d, %s",
|
", current : % d, total : % d, %s",
|
||||||
pFillInfo, pFillInfo->numOfRows, pFillInfo->index, pFillInfo->start, pFillInfo->end, pFillInfo->currentKey,
|
pFillInfo, pFillInfo->numOfRows, pFillInfo->index, pFillInfo->start, pFillInfo->end, pFillInfo->currentKey,
|
||||||
pFillInfo->numOfCurrent, pFillInfo->numOfTotal, pFillInfo->id);
|
pFillInfo->numOfCurrent, pFillInfo->numOfTotal, pFillInfo->id);
|
||||||
|
_end:
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||||
|
}
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -331,18 +331,26 @@ int32_t doGeosRelation(const GEOSGeometry *geom1, const GEOSPreparedGeometry *pr
|
||||||
|
|
||||||
if (!preparedGeom1) {
|
if (!preparedGeom1) {
|
||||||
if (!swapped) {
|
if (!swapped) {
|
||||||
ASSERT(relationFn);
|
if (!relationFn) {
|
||||||
|
return TSDB_CODE_FUNC_FUNTION_PARA_VALUE;
|
||||||
|
}
|
||||||
*res = relationFn(geosCtx->handle, geom1, geom2);
|
*res = relationFn(geosCtx->handle, geom1, geom2);
|
||||||
} else {
|
} else {
|
||||||
ASSERT(swappedRelationFn);
|
if (!swappedRelationFn) {
|
||||||
|
return TSDB_CODE_FUNC_FUNTION_PARA_VALUE;
|
||||||
|
}
|
||||||
*res = swappedRelationFn(geosCtx->handle, geom1, geom2);
|
*res = swappedRelationFn(geosCtx->handle, geom1, geom2);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if (!swapped) {
|
if (!swapped) {
|
||||||
ASSERT(preparedRelationFn);
|
if (!preparedRelationFn) {
|
||||||
|
return TSDB_CODE_FUNC_FUNTION_PARA_VALUE;
|
||||||
|
}
|
||||||
*res = preparedRelationFn(geosCtx->handle, preparedGeom1, geom2);
|
*res = preparedRelationFn(geosCtx->handle, preparedGeom1, geom2);
|
||||||
} else {
|
} else {
|
||||||
ASSERT(swappedPreparedRelationFn);
|
if (!swappedPreparedRelationFn) {
|
||||||
|
return TSDB_CODE_FUNC_FUNTION_PARA_VALUE;
|
||||||
|
}
|
||||||
*res = swappedPreparedRelationFn(geosCtx->handle, preparedGeom1, geom2);
|
*res = swappedPreparedRelationFn(geosCtx->handle, preparedGeom1, geom2);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -391,7 +399,9 @@ int32_t readGeometry(const unsigned char *input, GEOSGeometry **outputGeom,
|
||||||
const GEOSPreparedGeometry **outputPreparedGeom) {
|
const GEOSPreparedGeometry **outputPreparedGeom) {
|
||||||
SGeosContext *geosCtx = getThreadLocalGeosCtx();
|
SGeosContext *geosCtx = getThreadLocalGeosCtx();
|
||||||
|
|
||||||
ASSERT(outputGeom); // it is not allowed if outputGeom is NULL
|
if (!outputGeom) {
|
||||||
|
return TSDB_CODE_FUNC_FUNTION_PARA_VALUE;
|
||||||
|
}
|
||||||
*outputGeom = NULL;
|
*outputGeom = NULL;
|
||||||
|
|
||||||
if (outputPreparedGeom) { // it means not to generate PreparedGeometry if outputPreparedGeom is NULL
|
if (outputPreparedGeom) { // it means not to generate PreparedGeometry if outputPreparedGeom is NULL
|
||||||
|
|
|
@ -66,7 +66,7 @@ void setScalarParam(SScalarParam *sclParam, int32_t type, void *valueArray, TDRo
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
default: {
|
default: {
|
||||||
ASSERT(0);
|
ASSERT_TRUE(false);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -79,6 +79,7 @@ _error:
|
||||||
|
|
||||||
int32_t tBloomFilterPutHash(SBloomFilter* pBF, uint64_t hash1, uint64_t hash2) {
|
int32_t tBloomFilterPutHash(SBloomFilter* pBF, uint64_t hash1, uint64_t hash2) {
|
||||||
if (tBloomFilterIsFull(pBF)) {
|
if (tBloomFilterIsFull(pBF)) {
|
||||||
|
uError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(TSDB_CODE_INVALID_PARA));
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
}
|
}
|
||||||
bool hasChange = false;
|
bool hasChange = false;
|
||||||
|
|
|
@ -118,7 +118,7 @@ int32_t tScalableBfPut(SScalableBf* pSBf, const void* keyBuf, uint32_t len, int3
|
||||||
}
|
}
|
||||||
|
|
||||||
SBloomFilter* pNormalBf = taosArrayGetP(pSBf->bfArray, size - 1);
|
SBloomFilter* pNormalBf = taosArrayGetP(pSBf->bfArray, size - 1);
|
||||||
ASSERT(pNormalBf);
|
QUERY_CHECK_NULL(pNormalBf, code, lino, _end, terrno);
|
||||||
if (tBloomFilterIsFull(pNormalBf)) {
|
if (tBloomFilterIsFull(pNormalBf)) {
|
||||||
code = tScalableBfAddFilter(pSBf, pNormalBf->expectedEntries * pSBf->growth,
|
code = tScalableBfAddFilter(pSBf, pNormalBf->expectedEntries * pSBf->growth,
|
||||||
pNormalBf->errorRate * DEFAULT_TIGHTENING_RATIO, &pNormalBf);
|
pNormalBf->errorRate * DEFAULT_TIGHTENING_RATIO, &pNormalBf);
|
||||||
|
|
|
@ -0,0 +1,192 @@
|
||||||
|
from frame.log import *
|
||||||
|
from frame.cases import *
|
||||||
|
from frame.sql import *
|
||||||
|
from frame.caseBase import *
|
||||||
|
from frame import *
|
||||||
|
from frame.eos import *
|
||||||
|
|
||||||
|
|
||||||
|
class TDTestCase(TBase):
|
||||||
|
"""Add test case to verify TD-30816 (last/last_row accuracy)
|
||||||
|
"""
|
||||||
|
def init(self, conn, logSql, replicaVar=1):
|
||||||
|
self.replicaVar = int(replicaVar)
|
||||||
|
tdLog.debug("start to execute %s" % __file__)
|
||||||
|
tdSql.init(conn.cursor())
|
||||||
|
|
||||||
|
def prepare_data(self):
|
||||||
|
tdSql.execute("create database db_td30816 cachemodel 'both';")
|
||||||
|
tdSql.execute("use db_td30816;")
|
||||||
|
# create regular table
|
||||||
|
tdSql.execute("create table rt_int (ts timestamp, c1 int primary key, c2 int);")
|
||||||
|
tdSql.execute("create table rt_str (ts timestamp, c1 varchar(16) primary key, c2 varchar(16));")
|
||||||
|
|
||||||
|
# create stable
|
||||||
|
tdSql.execute("create table st_pk_int (ts timestamp, c1 int primary key, c2 int) tags (t1 int);")
|
||||||
|
tdSql.execute("create table st_pk_str (ts timestamp, c1 varchar(16) primary key, c2 varchar(16)) tags (t1 int);")
|
||||||
|
|
||||||
|
# create child table
|
||||||
|
tdSql.execute("create table ct1 using st_pk_int tags(1);")
|
||||||
|
tdSql.execute("create table ct2 using st_pk_int tags(2);")
|
||||||
|
|
||||||
|
tdSql.execute("create table ct3 using st_pk_str tags(3);")
|
||||||
|
tdSql.execute("create table ct4 using st_pk_str tags(4);")
|
||||||
|
|
||||||
|
# insert data to regular table
|
||||||
|
tdSql.execute("insert into rt_int values ('2021-01-01 00:00:00', 1, NULL);")
|
||||||
|
tdSql.execute("insert into rt_int values ('2021-01-01 00:00:01', 2, 1);")
|
||||||
|
tdSql.execute("insert into rt_str values ('2021-01-01 00:00:00', 'a', NULL);")
|
||||||
|
tdSql.execute("insert into rt_str values ('2021-01-01 00:00:01', 'b', '1');")
|
||||||
|
|
||||||
|
# insert data to child table
|
||||||
|
tdSql.execute("insert into ct1 values ('2021-01-01 00:00:00', 1, 1);")
|
||||||
|
tdSql.execute("insert into ct1 values ('2021-01-01 00:00:01', 2, NULL);")
|
||||||
|
tdSql.execute("insert into ct2 values ('2021-01-01 00:00:00', 3, 3);")
|
||||||
|
tdSql.execute("insert into ct2 values ('2021-01-01 00:00:01', 4, NULL);")
|
||||||
|
|
||||||
|
tdSql.execute("insert into ct3 values ('2021-01-01 00:00:00', 'a', '1');")
|
||||||
|
tdSql.execute("insert into ct3 values ('2021-01-01 00:00:01', 'b', NULL);")
|
||||||
|
tdSql.execute("insert into ct4 values ('2021-01-01 00:00:00', 'c', '3');")
|
||||||
|
tdSql.execute("insert into ct4 values ('2021-01-01 00:00:01', 'd', NULL);")
|
||||||
|
|
||||||
|
def test_last_with_primarykey_int_ct(self):
|
||||||
|
tdSql.execute("use db_td30816;")
|
||||||
|
tdSql.query("select last(*) from st_pk_int;")
|
||||||
|
tdSql.checkRows(1)
|
||||||
|
tdSql.checkData(0, 0, '2021-01-01 00:00:01')
|
||||||
|
tdSql.checkData(0, 1, 4)
|
||||||
|
tdSql.checkData(0, 2, 3)
|
||||||
|
|
||||||
|
tdSql.query("select last_row(*) from st_pk_int;")
|
||||||
|
tdSql.checkRows(1)
|
||||||
|
tdSql.checkData(0, 0, '2021-01-01 00:00:01')
|
||||||
|
tdSql.checkData(0, 1, 4)
|
||||||
|
tdSql.checkData(0, 2, None)
|
||||||
|
|
||||||
|
# delete and insert data
|
||||||
|
tdSql.execute("delete from ct1 where ts='2021-01-01 00:00:01';")
|
||||||
|
tdSql.execute("delete from ct2 where ts='2021-01-01 00:00:01';")
|
||||||
|
tdSql.execute("insert into ct1 values ('2021-01-01 00:00:00', 0, 5);")
|
||||||
|
tdSql.execute("insert into ct2 values ('2021-01-01 00:00:00', -1, 6);")
|
||||||
|
tdSql.query("select last(*) from st_pk_int;")
|
||||||
|
tdSql.checkRows(1)
|
||||||
|
tdSql.checkData(0, 0, '2021-01-01 00:00:00')
|
||||||
|
tdSql.checkData(0, 1, 3)
|
||||||
|
tdSql.checkData(0, 2, 3)
|
||||||
|
|
||||||
|
tdSql.query("select last_row(*) from st_pk_int;")
|
||||||
|
tdSql.checkRows(1)
|
||||||
|
tdSql.checkData(0, 0, '2021-01-01 00:00:00')
|
||||||
|
tdSql.checkData(0, 1, 3)
|
||||||
|
tdSql.checkData(0, 2, 3)
|
||||||
|
tdLog.info("Finish test_last_with_primarykey_int_ct")
|
||||||
|
|
||||||
|
def test_last_with_primarykey_str_ct(self):
|
||||||
|
tdSql.execute("use db_td30816;")
|
||||||
|
tdSql.query("select last(*) from st_pk_str;")
|
||||||
|
tdSql.checkRows(1)
|
||||||
|
tdSql.checkData(0, 0, '2021-01-01 00:00:01')
|
||||||
|
tdSql.checkData(0, 1, 'd')
|
||||||
|
tdSql.checkData(0, 2, '3')
|
||||||
|
|
||||||
|
tdSql.query("select last_row(*) from st_pk_str;")
|
||||||
|
tdSql.checkRows(1)
|
||||||
|
tdSql.checkData(0, 0, '2021-01-01 00:00:01')
|
||||||
|
tdSql.checkData(0, 1, 'd')
|
||||||
|
tdSql.checkData(0, 2, None)
|
||||||
|
|
||||||
|
# delete and insert data
|
||||||
|
tdSql.execute("delete from ct3 where ts='2021-01-01 00:00:01';")
|
||||||
|
tdSql.execute("delete from ct4 where ts='2021-01-01 00:00:01';")
|
||||||
|
tdSql.execute("insert into ct3 values ('2021-01-01 00:00:00', '6', '5');")
|
||||||
|
tdSql.execute("insert into ct4 values ('2021-01-01 00:00:00', '7', '6');")
|
||||||
|
|
||||||
|
tdSql.query("select last(*) from st_pk_str;")
|
||||||
|
tdSql.checkRows(1)
|
||||||
|
tdSql.checkData(0, 0, '2021-01-01 00:00:00')
|
||||||
|
tdSql.checkData(0, 1, 'c')
|
||||||
|
tdSql.checkData(0, 2, '3')
|
||||||
|
|
||||||
|
tdSql.query("select last_row(*) from st_pk_str;")
|
||||||
|
tdSql.checkRows(1)
|
||||||
|
tdSql.checkData(0, 0, '2021-01-01 00:00:00')
|
||||||
|
tdSql.checkData(0, 1, 'c')
|
||||||
|
tdSql.checkData(0, 2, 3)
|
||||||
|
tdLog.info("Finish test_last_with_primarykey_str_ct")
|
||||||
|
|
||||||
|
def test_last_with_primarykey_int_rt(self):
|
||||||
|
tdSql.execute("use db_td30816;")
|
||||||
|
tdSql.query("select last(*) from rt_int;")
|
||||||
|
tdSql.checkRows(1)
|
||||||
|
tdSql.checkData(0, 0, '2021-01-01 00:00:01')
|
||||||
|
tdSql.checkData(0, 1, 2)
|
||||||
|
tdSql.checkData(0, 2, 1)
|
||||||
|
|
||||||
|
tdSql.query("select last_row(*) from rt_int;")
|
||||||
|
tdSql.checkRows(1)
|
||||||
|
tdSql.checkData(0, 0, '2021-01-01 00:00:01')
|
||||||
|
tdSql.checkData(0, 1, 2)
|
||||||
|
tdSql.checkData(0, 2, 1)
|
||||||
|
|
||||||
|
tdSql.execute("delete from rt_int where ts='2021-01-01 00:00:01';")
|
||||||
|
tdSql.execute("insert into rt_int values ('2021-01-01 00:00:00', 0, 5);")
|
||||||
|
|
||||||
|
tdSql.query("select last(*) from rt_int;")
|
||||||
|
tdSql.checkRows(1)
|
||||||
|
tdSql.checkData(0, 0, '2021-01-01 00:00:00')
|
||||||
|
tdSql.checkData(0, 1, 1)
|
||||||
|
tdSql.checkData(0, 2, 5)
|
||||||
|
|
||||||
|
tdSql.query("select last_row(*) from rt_int;")
|
||||||
|
tdSql.checkRows(1)
|
||||||
|
tdSql.checkData(0, 0, '2021-01-01 00:00:00')
|
||||||
|
tdSql.checkData(0, 1, 1)
|
||||||
|
tdSql.checkData(0, 2, None)
|
||||||
|
tdLog.info("Finish test_last_with_primarykey_int_rt")
|
||||||
|
|
||||||
|
def test_last_with_primarykey_str_rt(self):
|
||||||
|
tdSql.execute("use db_td30816;")
|
||||||
|
tdSql.query("select last(*) from rt_str;")
|
||||||
|
tdSql.checkRows(1)
|
||||||
|
tdSql.checkData(0, 0, '2021-01-01 00:00:01')
|
||||||
|
tdSql.checkData(0, 1, 'b')
|
||||||
|
tdSql.checkData(0, 2, '1')
|
||||||
|
|
||||||
|
tdSql.query("select last_row(*) from rt_str;")
|
||||||
|
tdSql.checkRows(1)
|
||||||
|
tdSql.checkData(0, 0, '2021-01-01 00:00:01')
|
||||||
|
tdSql.checkData(0, 1, 'b')
|
||||||
|
tdSql.checkData(0, 2, '1')
|
||||||
|
|
||||||
|
tdSql.execute("delete from rt_str where ts='2021-01-01 00:00:01';")
|
||||||
|
tdSql.execute("insert into rt_str values ('2021-01-01 00:00:00', '2', '5');")
|
||||||
|
|
||||||
|
tdSql.query("select last(*) from rt_str;")
|
||||||
|
tdSql.checkRows(1)
|
||||||
|
tdSql.checkData(0, 0, '2021-01-01 00:00:00')
|
||||||
|
tdSql.checkData(0, 1, 'a')
|
||||||
|
tdSql.checkData(0, 2, '5')
|
||||||
|
|
||||||
|
tdSql.query("select last_row(*) from rt_str;")
|
||||||
|
tdSql.checkRows(1)
|
||||||
|
tdSql.checkData(0, 0, '2021-01-01 00:00:00')
|
||||||
|
tdSql.checkData(0, 1, 'a')
|
||||||
|
tdSql.checkData(0, 2, None)
|
||||||
|
tdLog.info("Finish test_last_with_primarykey_str_rt")
|
||||||
|
|
||||||
|
def run(self):
|
||||||
|
self.prepare_data()
|
||||||
|
# regular table
|
||||||
|
self.test_last_with_primarykey_int_rt()
|
||||||
|
self.test_last_with_primarykey_str_rt()
|
||||||
|
# child tables
|
||||||
|
self.test_last_with_primarykey_int_ct()
|
||||||
|
self.test_last_with_primarykey_str_ct()
|
||||||
|
|
||||||
|
def stop(self):
|
||||||
|
tdSql.execute("drop database db_td30816;")
|
||||||
|
tdSql.close()
|
||||||
|
tdLog.success("%s successfully executed" % __file__)
|
||||||
|
|
||||||
|
tdCases.addWindows(__file__, TDTestCase())
|
||||||
|
tdCases.addLinux(__file__, TDTestCase())
|
|
@ -38,6 +38,7 @@
|
||||||
,,y,army,./pytest.sh python3 ./test.py -f query/queryBugs.py -N 3
|
,,y,army,./pytest.sh python3 ./test.py -f query/queryBugs.py -N 3
|
||||||
,,y,army,./pytest.sh python3 ./test.py -f tmq/tmqBugs.py -N 3
|
,,y,army,./pytest.sh python3 ./test.py -f tmq/tmqBugs.py -N 3
|
||||||
,,y,army,./pytest.sh python3 ./test.py -f query/fill/fill_compare_asc_desc.py
|
,,y,army,./pytest.sh python3 ./test.py -f query/fill/fill_compare_asc_desc.py
|
||||||
|
,,y,army,./pytest.sh python3 ./test.py -f query/last/test_last.py
|
||||||
|
|
||||||
#
|
#
|
||||||
# system test
|
# system test
|
||||||
|
@ -310,7 +311,7 @@
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/balance_vgroups_r1.py -N 6
|
,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/balance_vgroups_r1.py -N 6
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/taosShell.py
|
,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/taosShell.py
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/taosShellError.py
|
,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/taosShellError.py
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/taosShellNetChk.py
|
,,n,system-test,python3 ./test.py -f 0-others/taosShellNetChk.py
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/telemetry.py
|
,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/telemetry.py
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/backquote_check.py
|
,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/backquote_check.py
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/taosdMonitor.py
|
,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/taosdMonitor.py
|
||||||
|
|
|
@ -49,6 +49,7 @@ fi
|
||||||
|
|
||||||
indirect_leak=$(cat ${LOG_DIR}/*.asan | grep "Indirect leak" | wc -l)
|
indirect_leak=$(cat ${LOG_DIR}/*.asan | grep "Indirect leak" | wc -l)
|
||||||
python_error=$(cat ${LOG_DIR}/*.info | grep -w "stack" | wc -l)
|
python_error=$(cat ${LOG_DIR}/*.info | grep -w "stack" | wc -l)
|
||||||
|
python_taos_error=$(cat ${LOG_DIR}/*.info |grep "#" | grep -w "TDinternal" | wc -l)
|
||||||
|
|
||||||
# ignore
|
# ignore
|
||||||
|
|
||||||
|
@ -84,17 +85,18 @@ echo -e "\033[44;32;1m"asan memory_leak: $memory_leak"\033[0m"
|
||||||
echo -e "\033[44;32;1m"asan indirect_leak: $indirect_leak"\033[0m"
|
echo -e "\033[44;32;1m"asan indirect_leak: $indirect_leak"\033[0m"
|
||||||
echo -e "\033[44;32;1m"asan runtime error: $runtime_error"\033[0m"
|
echo -e "\033[44;32;1m"asan runtime error: $runtime_error"\033[0m"
|
||||||
echo -e "\033[44;32;1m"asan python error: $python_error"\033[0m"
|
echo -e "\033[44;32;1m"asan python error: $python_error"\033[0m"
|
||||||
|
echo -e "\033[44;32;1m"asan python taos error: $python_taos_error"\033[0m"
|
||||||
|
|
||||||
let "errors=$error_num+$memory_leak+$indirect_leak+$runtime_error+$python_error"
|
let "errors=$error_num+$memory_leak+$indirect_leak+$runtime_error+$python_error+$python_taos_error"
|
||||||
|
|
||||||
if [ $errors -eq 0 ]; then
|
if [ $errors -eq 0 ]; then
|
||||||
echo -e "\033[44;32;1m"no asan errors"\033[0m"
|
echo -e "\033[44;32;1m"no asan errors"\033[0m"
|
||||||
exit 0
|
exit 0
|
||||||
else
|
else
|
||||||
echo -e "\033[44;31;1m"asan total errors: $errors"\033[0m"
|
echo -e "\033[44;31;1m"asan total errors: $errors"\033[0m"
|
||||||
if [ $python_error -ne 0 ]; then
|
if [ $python_error -ne 0 ] || [ $python_taos_error -ne 0 ] ; then
|
||||||
cat ${LOG_DIR}/*.info
|
cat ${LOG_DIR}/*.info |grep "#" | grep -w "TDinternal"
|
||||||
fi
|
fi
|
||||||
cat ${LOG_DIR}/*.asan
|
cat ${LOG_DIR}/*.asan
|
||||||
exit 1
|
exit 1
|
||||||
fi
|
fi
|
Loading…
Reference in New Issue