[td-225] fix bugs found by sim.
This commit is contained in:
parent
02bdbce35a
commit
9be35563a2
|
@ -167,9 +167,14 @@ typedef struct SDataBlockInfo {
|
||||||
} SDataBlockInfo;
|
} SDataBlockInfo;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
size_t numOfTables;
|
void *pTable;
|
||||||
|
TSKEY lastKey;
|
||||||
|
} STableKeyInfo;
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
size_t numOfTables;
|
||||||
SArray *pGroupList;
|
SArray *pGroupList;
|
||||||
SHashObj *map; // speedup acquire the tableQueryInfo from STableId
|
SHashObj *map; // speedup acquire the tableQueryInfo by table uid
|
||||||
} STableGroupInfo;
|
} STableGroupInfo;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -177,24 +182,24 @@ typedef struct {
|
||||||
*
|
*
|
||||||
* @param tsdb tsdb handle
|
* @param tsdb tsdb handle
|
||||||
* @param pCond query condition, including time window, result set order, and basic required columns for each block
|
* @param pCond query condition, including time window, result set order, and basic required columns for each block
|
||||||
* @param tableqinfoGroupInfo tableId list in the form of set, seperated into different groups according to group by condition
|
* @param tableInfoGroup table object list in the form of set, grouped into different sets according to the
|
||||||
|
* group by condition
|
||||||
* @param qinfo query info handle from query processor
|
* @param qinfo query info handle from query processor
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
TsdbQueryHandleT *tsdbQueryTables(TSDB_REPO_T *tsdb, STsdbQueryCond *pCond, STableGroupInfo *tableqinfoGroupInfo, void *qinfo);
|
TsdbQueryHandleT *tsdbQueryTables(TSDB_REPO_T *tsdb, STsdbQueryCond *pCond, STableGroupInfo *tableInfoGroup, void *qinfo);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get the last row of the given query time window for all the tables in STableGroupInfo object.
|
* Get the last row of the given query time window for all the tables in STableGroupInfo object.
|
||||||
* Note that only one data block with only row will be returned while invoking retrieve data block function for
|
* Note that only one data block with only row will be returned while invoking retrieve data block function for
|
||||||
* all tables in this group.
|
* all tables in this group.
|
||||||
*
|
*
|
||||||
* @param tsdb tsdb handle
|
* @param tsdb tsdb handle
|
||||||
* @param pCond query condition, including time window, result set order, and basic required columns for each
|
* @param pCond query condition, including time window, result set order, and basic required columns for each block
|
||||||
* block
|
* @param tableInfo table list.
|
||||||
* @param tableqinfoGroupInfo tableId list.
|
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
TsdbQueryHandleT tsdbQueryLastRow(TSDB_REPO_T *tsdb, STsdbQueryCond *pCond, STableGroupInfo *tableqinfoGroupInfo, void *qinfo);
|
TsdbQueryHandleT tsdbQueryLastRow(TSDB_REPO_T *tsdb, STsdbQueryCond *pCond, STableGroupInfo *tableInfo, void *qinfo);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* get the queried table object list
|
* get the queried table object list
|
||||||
|
@ -260,7 +265,7 @@ SArray *tsdbRetrieveDataBlock(TsdbQueryHandleT *pQueryHandle, SArray *pColumnIdL
|
||||||
* @param stableid. super table sid
|
* @param stableid. super table sid
|
||||||
* @param pTagCond. tag query condition
|
* @param pTagCond. tag query condition
|
||||||
*/
|
*/
|
||||||
int32_t tsdbQuerySTableByTagCond(TSDB_REPO_T *tsdb, uint64_t uid, const char *pTagCond, size_t len,
|
int32_t tsdbQuerySTableByTagCond(TSDB_REPO_T *tsdb, uint64_t uid, TSKEY key, const char *pTagCond, size_t len,
|
||||||
int16_t tagNameRelType, const char *tbnameCond, STableGroupInfo *pGroupList,
|
int16_t tagNameRelType, const char *tbnameCond, STableGroupInfo *pGroupList,
|
||||||
SColIndex *pColIndex, int32_t numOfCols);
|
SColIndex *pColIndex, int32_t numOfCols);
|
||||||
|
|
||||||
|
@ -278,7 +283,7 @@ void tsdbDestroyTableGroup(STableGroupInfo *pGroupList);
|
||||||
* @param pGroupInfo the generated result
|
* @param pGroupInfo the generated result
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
int32_t tsdbGetOneTableGroup(TSDB_REPO_T *tsdb, uint64_t uid, STableGroupInfo *pGroupInfo);
|
int32_t tsdbGetOneTableGroup(TSDB_REPO_T *tsdb, uint64_t uid, TSKEY startKey, STableGroupInfo *pGroupInfo);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*
|
*
|
||||||
|
|
|
@ -73,7 +73,6 @@ int32_t mnodeInitProfile() {
|
||||||
|
|
||||||
void mnodeCleanupProfile() {
|
void mnodeCleanupProfile() {
|
||||||
if (tsMnodeConnCache != NULL) {
|
if (tsMnodeConnCache != NULL) {
|
||||||
mInfo("conn cache is cleanup");
|
|
||||||
taosCacheCleanup(tsMnodeConnCache);
|
taosCacheCleanup(tsMnodeConnCache);
|
||||||
tsMnodeConnCache = NULL;
|
tsMnodeConnCache = NULL;
|
||||||
}
|
}
|
||||||
|
|
|
@ -186,8 +186,6 @@ typedef struct SQInfo {
|
||||||
void* signature;
|
void* signature;
|
||||||
int32_t pointsInterpo;
|
int32_t pointsInterpo;
|
||||||
int32_t code; // error code to returned to client
|
int32_t code; // error code to returned to client
|
||||||
// sem_t dataReady;
|
|
||||||
|
|
||||||
void* tsdb;
|
void* tsdb;
|
||||||
int32_t vgId;
|
int32_t vgId;
|
||||||
STableGroupInfo tableGroupInfo; // table id list < only includes the STable list>
|
STableGroupInfo tableGroupInfo; // table id list < only includes the STable list>
|
||||||
|
|
|
@ -1813,10 +1813,14 @@ static void doExchangeTimeWindow(SQInfo* pQInfo) {
|
||||||
for(int32_t i = 0; i < t; ++i) {
|
for(int32_t i = 0; i < t; ++i) {
|
||||||
SArray* p1 = GET_TABLEGROUP(pQInfo, i);
|
SArray* p1 = GET_TABLEGROUP(pQInfo, i);
|
||||||
|
|
||||||
|
SArray* tableKeyGroup = taosArrayGetP(pQInfo->tableGroupInfo.pGroupList, i);
|
||||||
size_t len = taosArrayGetSize(p1);
|
size_t len = taosArrayGetSize(p1);
|
||||||
for(int32_t j = 0; j < len; ++j) {
|
for(int32_t j = 0; j < len; ++j) {
|
||||||
STableQueryInfo* pTableQueryInfo = (STableQueryInfo*) taosArrayGetP(p1, j);
|
STableQueryInfo* pTableQueryInfo = (STableQueryInfo*) taosArrayGetP(p1, j);
|
||||||
SWAP(pTableQueryInfo->win.skey, pTableQueryInfo->win.ekey, TSKEY);
|
SWAP(pTableQueryInfo->win.skey, pTableQueryInfo->win.ekey, TSKEY);
|
||||||
|
|
||||||
|
STableKeyInfo* pInfo = taosArrayGet(tableKeyGroup, j);
|
||||||
|
pInfo->lastKey = pTableQueryInfo->win.skey;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -2925,7 +2929,7 @@ static void updateTableQueryInfoForReverseScan(SQuery *pQuery, STableQueryInfo *
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
// order has change already!
|
// order has changed already
|
||||||
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
|
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
|
||||||
|
|
||||||
// TODO validate the assertion
|
// TODO validate the assertion
|
||||||
|
@ -2934,9 +2938,13 @@ static void updateTableQueryInfoForReverseScan(SQuery *pQuery, STableQueryInfo *
|
||||||
// } else {
|
// } else {
|
||||||
// assert(pTableQueryInfo->win.ekey <= pTableQueryInfo->lastKey + step);
|
// assert(pTableQueryInfo->win.ekey <= pTableQueryInfo->lastKey + step);
|
||||||
// }
|
// }
|
||||||
|
|
||||||
pTableQueryInfo->win.ekey = pTableQueryInfo->lastKey + step;
|
if (pTableQueryInfo->lastKey == pTableQueryInfo->win.skey) {
|
||||||
|
// do nothing, no results
|
||||||
|
} else {
|
||||||
|
pTableQueryInfo->win.ekey = pTableQueryInfo->lastKey + step;
|
||||||
|
}
|
||||||
|
|
||||||
SWAP(pTableQueryInfo->win.skey, pTableQueryInfo->win.ekey, TSKEY);
|
SWAP(pTableQueryInfo->win.skey, pTableQueryInfo->win.ekey, TSKEY);
|
||||||
pTableQueryInfo->lastKey = pTableQueryInfo->win.skey;
|
pTableQueryInfo->lastKey = pTableQueryInfo->win.skey;
|
||||||
|
|
||||||
|
@ -2998,16 +3006,26 @@ void disableFuncInReverseScan(SQInfo *pQInfo) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static void setupQueryRangeForReverseScan(SQInfo* pQInfo) {
|
||||||
|
SQuery* pQuery = pQInfo->runtimeEnv.pQuery;
|
||||||
int32_t numOfGroups = GET_NUM_OF_TABLEGROUP(pQInfo);
|
int32_t numOfGroups = GET_NUM_OF_TABLEGROUP(pQInfo);
|
||||||
|
|
||||||
for(int32_t i = 0; i < numOfGroups; ++i) {
|
for(int32_t i = 0; i < numOfGroups; ++i) {
|
||||||
SArray *group = GET_TABLEGROUP(pQInfo, i);
|
SArray *group = GET_TABLEGROUP(pQInfo, i);
|
||||||
|
SArray *tableKeyGroup = taosArrayGetP(pQInfo->tableGroupInfo.pGroupList, i);
|
||||||
|
|
||||||
size_t t = taosArrayGetSize(group);
|
size_t t = taosArrayGetSize(group);
|
||||||
for (int32_t j = 0; j < t; ++j) {
|
for (int32_t j = 0; j < t; ++j) {
|
||||||
STableQueryInfo *pCheckInfo = taosArrayGetP(group, j);
|
STableQueryInfo *pCheckInfo = taosArrayGetP(group, j);
|
||||||
updateTableQueryInfoForReverseScan(pQuery, pCheckInfo);
|
updateTableQueryInfoForReverseScan(pQuery, pCheckInfo);
|
||||||
|
|
||||||
|
// update the last key in tableKeyInfo list
|
||||||
|
STableKeyInfo *pTableKeyInfo = taosArrayGet(tableKeyGroup, j);
|
||||||
|
pTableKeyInfo->lastKey = pCheckInfo->lastKey;
|
||||||
|
|
||||||
|
assert(pCheckInfo->pTable == pTableKeyInfo->pTable);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -3252,20 +3270,20 @@ static void setEnvBeforeReverseScan(SQueryRuntimeEnv *pRuntimeEnv, SQueryStatusI
|
||||||
.numOfCols = pQuery->numOfCols,
|
.numOfCols = pQuery->numOfCols,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
setQueryStatus(pQuery, QUERY_NOT_COMPLETED);
|
||||||
|
switchCtxOrder(pRuntimeEnv);
|
||||||
|
disableFuncInReverseScan(pQInfo);
|
||||||
|
setupQueryRangeForReverseScan(pQInfo);
|
||||||
|
|
||||||
// clean unused handle
|
// clean unused handle
|
||||||
if (pRuntimeEnv->pSecQueryHandle != NULL) {
|
if (pRuntimeEnv->pSecQueryHandle != NULL) {
|
||||||
tsdbCleanupQueryHandle(pRuntimeEnv->pSecQueryHandle);
|
tsdbCleanupQueryHandle(pRuntimeEnv->pSecQueryHandle);
|
||||||
}
|
}
|
||||||
|
|
||||||
// add ref for table
|
|
||||||
pRuntimeEnv->pSecQueryHandle = tsdbQueryTables(pQInfo->tsdb, &cond, &pQInfo->tableGroupInfo, pQInfo);
|
pRuntimeEnv->pSecQueryHandle = tsdbQueryTables(pQInfo->tsdb, &cond, &pQInfo->tableGroupInfo, pQInfo);
|
||||||
if (pRuntimeEnv->pSecQueryHandle == NULL) {
|
if (pRuntimeEnv->pSecQueryHandle == NULL) {
|
||||||
longjmp(pRuntimeEnv->env, terrno);
|
longjmp(pRuntimeEnv->env, terrno);
|
||||||
}
|
}
|
||||||
|
|
||||||
setQueryStatus(pQuery, QUERY_NOT_COMPLETED);
|
|
||||||
switchCtxOrder(pRuntimeEnv);
|
|
||||||
disableFuncInReverseScan(pQInfo);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static void clearEnvAfterReverseScan(SQueryRuntimeEnv *pRuntimeEnv, SQueryStatusInfo *pStatus) {
|
static void clearEnvAfterReverseScan(SQueryRuntimeEnv *pRuntimeEnv, SQueryStatusInfo *pStatus) {
|
||||||
|
@ -3290,6 +3308,13 @@ static void clearEnvAfterReverseScan(SQueryRuntimeEnv *pRuntimeEnv, SQueryStatus
|
||||||
pQuery->window = pTableQueryInfo->win;
|
pQuery->window = pTableQueryInfo->win;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void restoreTimeWindow(STableGroupInfo* pTableGroupInfo, STsdbQueryCond* pCond) {
|
||||||
|
assert(pTableGroupInfo->numOfTables == 1);
|
||||||
|
SArray* pTableKeyGroup = taosArrayGetP(pTableGroupInfo->pGroupList, 0);
|
||||||
|
STableKeyInfo* pKeyInfo = taosArrayGet(pTableKeyGroup, 0);
|
||||||
|
pKeyInfo->lastKey = pCond->twindow.skey;
|
||||||
|
}
|
||||||
|
|
||||||
void scanOneTableDataBlocks(SQueryRuntimeEnv *pRuntimeEnv, TSKEY start) {
|
void scanOneTableDataBlocks(SQueryRuntimeEnv *pRuntimeEnv, TSKEY start) {
|
||||||
SQInfo *pQInfo = (SQInfo *) GET_QINFO_ADDR(pRuntimeEnv);
|
SQInfo *pQInfo = (SQInfo *) GET_QINFO_ADDR(pRuntimeEnv);
|
||||||
SQuery *pQuery = pRuntimeEnv->pQuery;
|
SQuery *pQuery = pRuntimeEnv->pQuery;
|
||||||
|
@ -3337,6 +3362,7 @@ void scanOneTableDataBlocks(SQueryRuntimeEnv *pRuntimeEnv, TSKEY start) {
|
||||||
tsdbCleanupQueryHandle(pRuntimeEnv->pSecQueryHandle);
|
tsdbCleanupQueryHandle(pRuntimeEnv->pSecQueryHandle);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
restoreTimeWindow(&pQInfo->tableGroupInfo, &cond);
|
||||||
pRuntimeEnv->pSecQueryHandle = tsdbQueryTables(pQInfo->tsdb, &cond, &pQInfo->tableGroupInfo, pQInfo);
|
pRuntimeEnv->pSecQueryHandle = tsdbQueryTables(pQInfo->tsdb, &cond, &pQInfo->tableGroupInfo, pQInfo);
|
||||||
if (pRuntimeEnv->pSecQueryHandle == NULL) {
|
if (pRuntimeEnv->pSecQueryHandle == NULL) {
|
||||||
longjmp(pRuntimeEnv->env, terrno);
|
longjmp(pRuntimeEnv->env, terrno);
|
||||||
|
@ -4409,9 +4435,11 @@ static bool multiTableMultioutputHelper(SQInfo *pQInfo, int32_t index) {
|
||||||
|
|
||||||
// todo refactor
|
// todo refactor
|
||||||
SArray *g1 = taosArrayInit(1, POINTER_BYTES);
|
SArray *g1 = taosArrayInit(1, POINTER_BYTES);
|
||||||
SArray *tx = taosArrayInit(1, POINTER_BYTES);
|
SArray *tx = taosArrayInit(1, sizeof(STableKeyInfo));
|
||||||
|
|
||||||
|
STableKeyInfo info = {.pTable = pCheckInfo->pTable, .lastKey = pCheckInfo->lastKey};
|
||||||
|
taosArrayPush(tx, &info);
|
||||||
|
|
||||||
taosArrayPush(tx, &pCheckInfo->pTable);
|
|
||||||
taosArrayPush(g1, &tx);
|
taosArrayPush(g1, &tx);
|
||||||
STableGroupInfo gp = {.numOfTables = 1, .pGroupList = g1};
|
STableGroupInfo gp = {.numOfTables = 1, .pGroupList = g1};
|
||||||
|
|
||||||
|
@ -4561,7 +4589,9 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
|
||||||
pRuntimeEnv->pQueryHandle = NULL;
|
pRuntimeEnv->pQueryHandle = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// no need to update the lastkey for each table
|
||||||
pRuntimeEnv->pQueryHandle = tsdbQueryTables(pQInfo->tsdb, &cond, &gp, pQInfo);
|
pRuntimeEnv->pQueryHandle = tsdbQueryTables(pQInfo->tsdb, &cond, &gp, pQInfo);
|
||||||
|
|
||||||
taosArrayDestroy(g1);
|
taosArrayDestroy(g1);
|
||||||
taosArrayDestroy(tx);
|
taosArrayDestroy(tx);
|
||||||
if (pRuntimeEnv->pQueryHandle == NULL) {
|
if (pRuntimeEnv->pQueryHandle == NULL) {
|
||||||
|
@ -4687,8 +4717,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
|
||||||
taosArrayPush(pQInfo->arrTableIdInfo, &tidInfo);
|
taosArrayPush(pQInfo->arrTableIdInfo, &tidInfo);
|
||||||
|
|
||||||
// if the buffer is full or group by each table, we need to jump out of the loop
|
// if the buffer is full or group by each table, we need to jump out of the loop
|
||||||
if (Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL) /*||
|
if (Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL)) {
|
||||||
isGroupbyEachTable(pQuery->pGroupbyExpr, pSupporter->pSidSet)*/) {
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -4753,21 +4782,22 @@ static void doSaveContext(SQInfo *pQInfo) {
|
||||||
.colList = pQuery->colList,
|
.colList = pQuery->colList,
|
||||||
.numOfCols = pQuery->numOfCols,
|
.numOfCols = pQuery->numOfCols,
|
||||||
};
|
};
|
||||||
|
|
||||||
// clean unused handle
|
// clean unused handle
|
||||||
if (pRuntimeEnv->pSecQueryHandle != NULL) {
|
if (pRuntimeEnv->pSecQueryHandle != NULL) {
|
||||||
tsdbCleanupQueryHandle(pRuntimeEnv->pSecQueryHandle);
|
tsdbCleanupQueryHandle(pRuntimeEnv->pSecQueryHandle);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
setQueryStatus(pQuery, QUERY_NOT_COMPLETED);
|
||||||
|
switchCtxOrder(pRuntimeEnv);
|
||||||
|
disableFuncInReverseScan(pQInfo);
|
||||||
|
setupQueryRangeForReverseScan(pQInfo);
|
||||||
|
|
||||||
pRuntimeEnv->prevGroupId = INT32_MIN;
|
pRuntimeEnv->prevGroupId = INT32_MIN;
|
||||||
pRuntimeEnv->pSecQueryHandle = tsdbQueryTables(pQInfo->tsdb, &cond, &pQInfo->tableGroupInfo, pQInfo);
|
pRuntimeEnv->pSecQueryHandle = tsdbQueryTables(pQInfo->tsdb, &cond, &pQInfo->tableGroupInfo, pQInfo);
|
||||||
if (pRuntimeEnv->pSecQueryHandle == NULL) {
|
if (pRuntimeEnv->pSecQueryHandle == NULL) {
|
||||||
longjmp(pRuntimeEnv->env, terrno);
|
longjmp(pRuntimeEnv->env, terrno);
|
||||||
}
|
}
|
||||||
|
|
||||||
setQueryStatus(pQuery, QUERY_NOT_COMPLETED);
|
|
||||||
switchCtxOrder(pRuntimeEnv);
|
|
||||||
disableFuncInReverseScan(pQInfo);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static void doRestoreContext(SQInfo *pQInfo) {
|
static void doRestoreContext(SQInfo *pQInfo) {
|
||||||
|
@ -5861,8 +5891,8 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SArray* pTableIdList,
|
||||||
}
|
}
|
||||||
|
|
||||||
for(int32_t j = 0; j < s; ++j) {
|
for(int32_t j = 0; j < s; ++j) {
|
||||||
void* pTable = taosArrayGetP(pa, j);
|
STableKeyInfo* info = taosArrayGet(pa, j);
|
||||||
STableId* id = TSDB_TABLEID(pTable);
|
STableId* id = TSDB_TABLEID(info->pTable);
|
||||||
|
|
||||||
STableIdInfo* pTableId = taosArraySearch(pTableIdList, id, compareTableIdInfo);
|
STableIdInfo* pTableId = taosArraySearch(pTableIdList, id, compareTableIdInfo);
|
||||||
if (pTableId != NULL ) {
|
if (pTableId != NULL ) {
|
||||||
|
@ -5872,10 +5902,11 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SArray* pTableIdList,
|
||||||
}
|
}
|
||||||
|
|
||||||
void* buf = pQInfo->pBuf + index * sizeof(STableQueryInfo);
|
void* buf = pQInfo->pBuf + index * sizeof(STableQueryInfo);
|
||||||
STableQueryInfo* item = createTableQueryInfo(&pQInfo->runtimeEnv, pTable, window, buf);
|
STableQueryInfo* item = createTableQueryInfo(&pQInfo->runtimeEnv, info->pTable, window, buf);
|
||||||
if (item == NULL) {
|
if (item == NULL) {
|
||||||
goto _cleanup;
|
goto _cleanup;
|
||||||
}
|
}
|
||||||
|
|
||||||
item->groupIndex = i;
|
item->groupIndex = i;
|
||||||
taosArrayPush(p1, &item);
|
taosArrayPush(p1, &item);
|
||||||
taosHashPut(pQInfo->tableqinfoGroupInfo.map, &id->tid, sizeof(id->tid), &item, POINTER_BYTES);
|
taosHashPut(pQInfo->tableqinfoGroupInfo.map, &id->tid, sizeof(id->tid), &item, POINTER_BYTES);
|
||||||
|
@ -5904,6 +5935,7 @@ _cleanup_query:
|
||||||
taosArrayDestroy(pGroupbyExpr->columnInfo);
|
taosArrayDestroy(pGroupbyExpr->columnInfo);
|
||||||
free(pGroupbyExpr);
|
free(pGroupbyExpr);
|
||||||
}
|
}
|
||||||
|
|
||||||
taosTFree(pTagCols);
|
taosTFree(pTagCols);
|
||||||
for (int32_t i = 0; i < numOfOutput; ++i) {
|
for (int32_t i = 0; i < numOfOutput; ++i) {
|
||||||
SExprInfo* pExprInfo = &pExprs[i];
|
SExprInfo* pExprInfo = &pExprs[i];
|
||||||
|
@ -5911,6 +5943,7 @@ _cleanup_query:
|
||||||
tExprTreeDestroy(&pExprInfo->pExpr, NULL);
|
tExprTreeDestroy(&pExprInfo->pExpr, NULL);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
taosTFree(pExprs);
|
taosTFree(pExprs);
|
||||||
|
|
||||||
_cleanup:
|
_cleanup:
|
||||||
|
@ -6198,7 +6231,7 @@ int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, qi
|
||||||
STableIdInfo *id = taosArrayGet(pTableIdList, 0);
|
STableIdInfo *id = taosArrayGet(pTableIdList, 0);
|
||||||
|
|
||||||
qDebug("qmsg:%p query normal table, uid:%"PRId64", tid:%d", pQueryMsg, id->uid, id->tid);
|
qDebug("qmsg:%p query normal table, uid:%"PRId64", tid:%d", pQueryMsg, id->uid, id->tid);
|
||||||
if ((code = tsdbGetOneTableGroup(tsdb, id->uid, &tableGroupInfo)) != TSDB_CODE_SUCCESS) {
|
if ((code = tsdbGetOneTableGroup(tsdb, id->uid, pQueryMsg->window.skey, &tableGroupInfo)) != TSDB_CODE_SUCCESS) {
|
||||||
goto _over;
|
goto _over;
|
||||||
}
|
}
|
||||||
} else if (TSDB_QUERY_HAS_TYPE(pQueryMsg->queryType, TSDB_QUERY_TYPE_MULTITABLE_QUERY|TSDB_QUERY_TYPE_STABLE_QUERY)) {
|
} else if (TSDB_QUERY_HAS_TYPE(pQueryMsg->queryType, TSDB_QUERY_TYPE_MULTITABLE_QUERY|TSDB_QUERY_TYPE_STABLE_QUERY)) {
|
||||||
|
@ -6215,8 +6248,9 @@ int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, qi
|
||||||
}
|
}
|
||||||
|
|
||||||
qDebug("qmsg:%p query stable, uid:%"PRId64", tid:%d", pQueryMsg, id->uid, id->tid);
|
qDebug("qmsg:%p query stable, uid:%"PRId64", tid:%d", pQueryMsg, id->uid, id->tid);
|
||||||
code = tsdbQuerySTableByTagCond(tsdb, id->uid, tagCond, pQueryMsg->tagCondLen, pQueryMsg->tagNameRelType, tbnameCond, &tableGroupInfo, pGroupColIndex,
|
code = tsdbQuerySTableByTagCond(tsdb, id->uid, pQueryMsg->window.skey, tagCond, pQueryMsg->tagCondLen,
|
||||||
numOfGroupByCols);
|
pQueryMsg->tagNameRelType, tbnameCond, &tableGroupInfo, pGroupColIndex, numOfGroupByCols);
|
||||||
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
qError("qmsg:%p failed to query stable, reason: %s", pQueryMsg, tstrerror(code));
|
qError("qmsg:%p failed to query stable, reason: %s", pQueryMsg, tstrerror(code));
|
||||||
goto _over;
|
goto _over;
|
||||||
|
|
|
@ -172,6 +172,7 @@ TsdbQueryHandleT* tsdbQueryTables(TSDB_REPO_T* tsdb, STsdbQueryCond* pCond, STab
|
||||||
if (pQueryHandle == NULL) {
|
if (pQueryHandle == NULL) {
|
||||||
goto out_of_memory;
|
goto out_of_memory;
|
||||||
}
|
}
|
||||||
|
|
||||||
pQueryHandle->order = pCond->order;
|
pQueryHandle->order = pCond->order;
|
||||||
pQueryHandle->window = pCond->twindow;
|
pQueryHandle->window = pCond->twindow;
|
||||||
pQueryHandle->pTsdb = tsdb;
|
pQueryHandle->pTsdb = tsdb;
|
||||||
|
@ -190,9 +191,6 @@ TsdbQueryHandleT* tsdbQueryTables(TSDB_REPO_T* tsdb, STsdbQueryCond* pCond, STab
|
||||||
|
|
||||||
tsdbTakeMemSnapshot(pQueryHandle->pTsdb, &pQueryHandle->mem, &pQueryHandle->imem);
|
tsdbTakeMemSnapshot(pQueryHandle->pTsdb, &pQueryHandle->mem, &pQueryHandle->imem);
|
||||||
|
|
||||||
size_t sizeOfGroup = taosArrayGetSize(groupList->pGroupList);
|
|
||||||
assert(sizeOfGroup >= 1 && pCond != NULL && pCond->numOfCols > 0);
|
|
||||||
|
|
||||||
// allocate buffer in order to load data blocks from file
|
// allocate buffer in order to load data blocks from file
|
||||||
int32_t numOfCols = pCond->numOfCols;
|
int32_t numOfCols = pCond->numOfCols;
|
||||||
|
|
||||||
|
@ -200,6 +198,7 @@ TsdbQueryHandleT* tsdbQueryTables(TSDB_REPO_T* tsdb, STsdbQueryCond* pCond, STab
|
||||||
if (pQueryHandle->statis == NULL) {
|
if (pQueryHandle->statis == NULL) {
|
||||||
goto out_of_memory;
|
goto out_of_memory;
|
||||||
}
|
}
|
||||||
|
|
||||||
pQueryHandle->pColumns = taosArrayInit(numOfCols, sizeof(SColumnInfoData)); // todo: use list instead of array?
|
pQueryHandle->pColumns = taosArrayInit(numOfCols, sizeof(SColumnInfoData)); // todo: use list instead of array?
|
||||||
if (pQueryHandle->pColumns == NULL) {
|
if (pQueryHandle->pColumns == NULL) {
|
||||||
goto out_of_memory;
|
goto out_of_memory;
|
||||||
|
@ -221,9 +220,13 @@ TsdbQueryHandleT* tsdbQueryTables(TSDB_REPO_T* tsdb, STsdbQueryCond* pCond, STab
|
||||||
if (pQueryHandle->pTableCheckInfo == NULL) {
|
if (pQueryHandle->pTableCheckInfo == NULL) {
|
||||||
goto out_of_memory;
|
goto out_of_memory;
|
||||||
}
|
}
|
||||||
|
|
||||||
STsdbMeta* pMeta = tsdbGetMeta(tsdb);
|
STsdbMeta* pMeta = tsdbGetMeta(tsdb);
|
||||||
assert(pMeta != NULL);
|
assert(pMeta != NULL);
|
||||||
|
|
||||||
|
size_t sizeOfGroup = taosArrayGetSize(groupList->pGroupList);
|
||||||
|
assert(sizeOfGroup >= 1 && pCond != NULL && pCond->numOfCols > 0);
|
||||||
|
|
||||||
for (int32_t i = 0; i < sizeOfGroup; ++i) {
|
for (int32_t i = 0; i < sizeOfGroup; ++i) {
|
||||||
SArray* group = *(SArray**) taosArrayGet(groupList->pGroupList, i);
|
SArray* group = *(SArray**) taosArrayGet(groupList->pGroupList, i);
|
||||||
|
|
||||||
|
@ -231,17 +234,23 @@ TsdbQueryHandleT* tsdbQueryTables(TSDB_REPO_T* tsdb, STsdbQueryCond* pCond, STab
|
||||||
assert(gsize > 0);
|
assert(gsize > 0);
|
||||||
|
|
||||||
for (int32_t j = 0; j < gsize; ++j) {
|
for (int32_t j = 0; j < gsize; ++j) {
|
||||||
STable* pTable = (STable*) taosArrayGetP(group, j);
|
STableKeyInfo* pKeyInfo = (STableKeyInfo*) taosArrayGet(group, j);
|
||||||
|
|
||||||
STableCheckInfo info = {
|
STableCheckInfo info = {
|
||||||
.lastKey = pQueryHandle->window.skey,
|
.lastKey = pKeyInfo->lastKey,
|
||||||
.tableId = pTable->tableId,
|
.tableId = ((STable*)(pKeyInfo->pTable))->tableId,
|
||||||
.pTableObj = pTable,
|
.pTableObj = pKeyInfo->pTable,
|
||||||
};
|
};
|
||||||
|
|
||||||
assert(info.pTableObj != NULL && (info.pTableObj->type == TSDB_NORMAL_TABLE ||
|
assert(info.pTableObj != NULL && (info.pTableObj->type == TSDB_NORMAL_TABLE ||
|
||||||
info.pTableObj->type == TSDB_CHILD_TABLE || info.pTableObj->type == TSDB_STREAM_TABLE));
|
info.pTableObj->type == TSDB_CHILD_TABLE || info.pTableObj->type == TSDB_STREAM_TABLE));
|
||||||
|
|
||||||
|
if (ASCENDING_TRAVERSE(pQueryHandle->order)) {
|
||||||
|
assert(info.lastKey >= pQueryHandle->window.skey);
|
||||||
|
} else {
|
||||||
|
assert(info.lastKey <= pQueryHandle->window.skey);
|
||||||
|
}
|
||||||
|
|
||||||
taosArrayPush(pQueryHandle->pTableCheckInfo, &info);
|
taosArrayPush(pQueryHandle->pTableCheckInfo, &info);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -315,19 +324,22 @@ static bool initTableMemIterator(STsdbQueryHandle* pHandle, STableCheckInfo* pCh
|
||||||
|
|
||||||
assert(pCheckInfo->iter == NULL && pCheckInfo->iiter == NULL);
|
assert(pCheckInfo->iter == NULL && pCheckInfo->iiter == NULL);
|
||||||
|
|
||||||
|
STableData* pMem = NULL;
|
||||||
|
STableData* pIMem = NULL;
|
||||||
|
|
||||||
if (pHandle->mem && pCheckInfo->tableId.tid < pHandle->mem->maxTables) {
|
if (pHandle->mem && pCheckInfo->tableId.tid < pHandle->mem->maxTables) {
|
||||||
STableData* ptd = pHandle->mem->tData[pCheckInfo->tableId.tid];
|
pMem = pHandle->mem->tData[pCheckInfo->tableId.tid];
|
||||||
if (ptd != NULL && ptd->uid == pCheckInfo->tableId.uid) { // check uid
|
if (pMem != NULL && pMem->uid == pCheckInfo->tableId.uid) { // check uid
|
||||||
pCheckInfo->iter =
|
pCheckInfo->iter =
|
||||||
tSkipListCreateIterFromVal(ptd->pData, (const char*)&pCheckInfo->lastKey, TSDB_DATA_TYPE_TIMESTAMP, order);
|
tSkipListCreateIterFromVal(pMem->pData, (const char*)&pCheckInfo->lastKey, TSDB_DATA_TYPE_TIMESTAMP, order);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pHandle->imem && pCheckInfo->tableId.tid < pHandle->imem->maxTables) {
|
if (pHandle->imem && pCheckInfo->tableId.tid < pHandle->imem->maxTables) {
|
||||||
STableData* ptd = pHandle->imem->tData[pCheckInfo->tableId.tid];
|
pIMem = pHandle->imem->tData[pCheckInfo->tableId.tid];
|
||||||
if (ptd != NULL && ptd->uid == pCheckInfo->tableId.uid) { // check uid
|
if (pIMem != NULL && pIMem->uid == pCheckInfo->tableId.uid) { // check uid
|
||||||
pCheckInfo->iiter =
|
pCheckInfo->iiter =
|
||||||
tSkipListCreateIterFromVal(ptd->pData, (const char*)&pCheckInfo->lastKey, TSDB_DATA_TYPE_TIMESTAMP, order);
|
tSkipListCreateIterFromVal(pIMem->pData, (const char*)&pCheckInfo->lastKey, TSDB_DATA_TYPE_TIMESTAMP, order);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -348,8 +360,17 @@ static bool initTableMemIterator(STsdbQueryHandle* pHandle, STableCheckInfo* pCh
|
||||||
|
|
||||||
SDataRow row = SL_GET_NODE_DATA(node);
|
SDataRow row = SL_GET_NODE_DATA(node);
|
||||||
TSKEY key = dataRowKey(row); // first timestamp in buffer
|
TSKEY key = dataRowKey(row); // first timestamp in buffer
|
||||||
tsdbDebug("%p uid:%" PRId64", tid:%d check data in mem from skey:%" PRId64 ", order:%d, %p", pHandle,
|
tsdbDebug("%p uid:%" PRId64 ", tid:%d check data in mem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64
|
||||||
pCheckInfo->tableId.uid, pCheckInfo->tableId.tid, key, order, pHandle->qinfo);
|
"-%" PRId64 ", lastKey:%" PRId64 ", %p",
|
||||||
|
pHandle, pCheckInfo->tableId.uid, pCheckInfo->tableId.tid, key, order, pMem->keyFirst, pMem->keyLast,
|
||||||
|
pCheckInfo->lastKey, pHandle->qinfo);
|
||||||
|
|
||||||
|
if (ASCENDING_TRAVERSE(order)) {
|
||||||
|
assert(pCheckInfo->lastKey <= key);
|
||||||
|
} else {
|
||||||
|
assert(pCheckInfo->lastKey >= key);
|
||||||
|
}
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
tsdbDebug("%p uid:%"PRId64", tid:%d no data in mem, %p", pHandle, pCheckInfo->tableId.uid, pCheckInfo->tableId.tid,
|
tsdbDebug("%p uid:%"PRId64", tid:%d no data in mem, %p", pHandle, pCheckInfo->tableId.uid, pCheckInfo->tableId.tid,
|
||||||
pHandle->qinfo);
|
pHandle->qinfo);
|
||||||
|
@ -361,8 +382,16 @@ static bool initTableMemIterator(STsdbQueryHandle* pHandle, STableCheckInfo* pCh
|
||||||
|
|
||||||
SDataRow row = SL_GET_NODE_DATA(node);
|
SDataRow row = SL_GET_NODE_DATA(node);
|
||||||
TSKEY key = dataRowKey(row); // first timestamp in buffer
|
TSKEY key = dataRowKey(row); // first timestamp in buffer
|
||||||
tsdbDebug("%p uid:%" PRId64", tid:%d check data in imem from skey:%" PRId64 ", order:%d, %p", pHandle,
|
tsdbDebug("%p uid:%" PRId64 ", tid:%d check data in mem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64
|
||||||
pCheckInfo->tableId.uid, pCheckInfo->tableId.tid, key, order, pHandle->qinfo);
|
"-%" PRId64 ", lastKey:%" PRId64 ", %p",
|
||||||
|
pHandle, pCheckInfo->tableId.uid, pCheckInfo->tableId.tid, key, order, pIMem->keyFirst, pIMem->keyLast,
|
||||||
|
pCheckInfo->lastKey, pHandle->qinfo);
|
||||||
|
|
||||||
|
if (ASCENDING_TRAVERSE(order)) {
|
||||||
|
assert(pCheckInfo->lastKey <= key);
|
||||||
|
} else {
|
||||||
|
assert(pCheckInfo->lastKey >= key);
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
tsdbDebug("%p uid:%"PRId64", tid:%d no data in imem, %p", pHandle, pCheckInfo->tableId.uid, pCheckInfo->tableId.tid,
|
tsdbDebug("%p uid:%"PRId64", tid:%d no data in imem, %p", pHandle, pCheckInfo->tableId.uid, pCheckInfo->tableId.tid,
|
||||||
pHandle->qinfo);
|
pHandle->qinfo);
|
||||||
|
@ -2033,7 +2062,9 @@ static int32_t getAllTableList(STable* pSuperTable, SArray* list) {
|
||||||
SSkipListNode* pNode = tSkipListIterGet(iter);
|
SSkipListNode* pNode = tSkipListIterGet(iter);
|
||||||
|
|
||||||
STable** pTable = (STable**) SL_GET_NODE_DATA((SSkipListNode*) pNode);
|
STable** pTable = (STable**) SL_GET_NODE_DATA((SSkipListNode*) pNode);
|
||||||
taosArrayPush(list, pTable);
|
|
||||||
|
STableKeyInfo info = {.pTable = *pTable, .lastKey = TSKEY_INITIAL_VAL};
|
||||||
|
taosArrayPush(list, &info);
|
||||||
}
|
}
|
||||||
|
|
||||||
tSkipListDestroyIter(iter);
|
tSkipListDestroyIter(iter);
|
||||||
|
@ -2089,8 +2120,8 @@ typedef struct STableGroupSupporter {
|
||||||
|
|
||||||
int32_t tableGroupComparFn(const void *p1, const void *p2, const void *param) {
|
int32_t tableGroupComparFn(const void *p1, const void *p2, const void *param) {
|
||||||
STableGroupSupporter* pTableGroupSupp = (STableGroupSupporter*) param;
|
STableGroupSupporter* pTableGroupSupp = (STableGroupSupporter*) param;
|
||||||
STable* pTable1 = *(STable**) p1;
|
STable* pTable1 = ((STableKeyInfo*) p1)->pTable;
|
||||||
STable* pTable2 = *(STable**) p2;
|
STable* pTable2 = ((STableKeyInfo*) p2)->pTable;
|
||||||
|
|
||||||
for (int32_t i = 0; i < pTableGroupSupp->numOfCols; ++i) {
|
for (int32_t i = 0; i < pTableGroupSupp->numOfCols; ++i) {
|
||||||
SColIndex* pColIndex = &pTableGroupSupp->pCols[i];
|
SColIndex* pColIndex = &pTableGroupSupp->pCols[i];
|
||||||
|
@ -2140,12 +2171,14 @@ int32_t tableGroupComparFn(const void *p1, const void *p2, const void *param) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
void createTableGroupImpl(SArray* pGroups, SArray* pTableList, size_t numOfTables, STableGroupSupporter* pSupp,
|
void createTableGroupImpl(SArray* pGroups, SArray* pTableList, size_t numOfTables, TSKEY skey, STableGroupSupporter* pSupp,
|
||||||
__ext_compar_fn_t compareFn) {
|
__ext_compar_fn_t compareFn) {
|
||||||
STable* pTable = taosArrayGetP(pTableList, 0);
|
STable* pTable = taosArrayGetP(pTableList, 0);
|
||||||
|
|
||||||
SArray* g = taosArrayInit(16, POINTER_BYTES);
|
SArray* g = taosArrayInit(16, sizeof(STableKeyInfo));
|
||||||
taosArrayPush(g, &pTable);
|
|
||||||
|
STableKeyInfo info = {.pTable = pTable, .lastKey = skey};
|
||||||
|
taosArrayPush(g, &info);
|
||||||
tsdbRefTable(pTable);
|
tsdbRefTable(pTable);
|
||||||
|
|
||||||
for (int32_t i = 1; i < numOfTables; ++i) {
|
for (int32_t i = 1; i < numOfTables; ++i) {
|
||||||
|
@ -2159,18 +2192,21 @@ void createTableGroupImpl(SArray* pGroups, SArray* pTableList, size_t numOfTable
|
||||||
assert((*p)->type == TSDB_CHILD_TABLE);
|
assert((*p)->type == TSDB_CHILD_TABLE);
|
||||||
|
|
||||||
if (ret == 0) {
|
if (ret == 0) {
|
||||||
taosArrayPush(g, p);
|
STableKeyInfo info1 = {.pTable = *p, .lastKey = skey};
|
||||||
|
taosArrayPush(g, &info1);
|
||||||
} else {
|
} else {
|
||||||
taosArrayPush(pGroups, &g); // current group is ended, start a new group
|
taosArrayPush(pGroups, &g); // current group is ended, start a new group
|
||||||
g = taosArrayInit(16, POINTER_BYTES);
|
g = taosArrayInit(16, sizeof(STableKeyInfo));
|
||||||
taosArrayPush(g, p);
|
|
||||||
|
STableKeyInfo info1 = {.pTable = *p, .lastKey = skey};
|
||||||
|
taosArrayPush(g, &info1);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
taosArrayPush(pGroups, &g);
|
taosArrayPush(pGroups, &g);
|
||||||
}
|
}
|
||||||
|
|
||||||
SArray* createTableGroup(SArray* pTableList, STSchema* pTagSchema, SColIndex* pCols, int32_t numOfOrderCols) {
|
SArray* createTableGroup(SArray* pTableList, STSchema* pTagSchema, SColIndex* pCols, int32_t numOfOrderCols, TSKEY skey) {
|
||||||
assert(pTableList != NULL);
|
assert(pTableList != NULL);
|
||||||
SArray* pTableGroup = taosArrayInit(1, POINTER_BYTES);
|
SArray* pTableGroup = taosArrayInit(1, POINTER_BYTES);
|
||||||
|
|
||||||
|
@ -2181,13 +2217,16 @@ SArray* createTableGroup(SArray* pTableList, STSchema* pTagSchema, SColIndex* pC
|
||||||
}
|
}
|
||||||
|
|
||||||
if (numOfOrderCols == 0 || size == 1) { // no group by tags clause or only one table
|
if (numOfOrderCols == 0 || size == 1) { // no group by tags clause or only one table
|
||||||
SArray* sa = taosArrayInit(size, POINTER_BYTES);
|
SArray* sa = taosArrayInit(size, sizeof(STableKeyInfo));
|
||||||
for(int32_t i = 0; i < size; ++i) {
|
|
||||||
STable** pTable = taosArrayGet(pTableList, i);
|
|
||||||
assert((*pTable)->type == TSDB_CHILD_TABLE);
|
|
||||||
|
|
||||||
tsdbRefTable(*pTable);
|
for(int32_t i = 0; i < size; ++i) {
|
||||||
taosArrayPush(sa, pTable);
|
STableKeyInfo *pKeyInfo = taosArrayGet(pTableList, i);
|
||||||
|
assert(((STable*)pKeyInfo->pTable)->type == TSDB_CHILD_TABLE);
|
||||||
|
|
||||||
|
tsdbRefTable(pKeyInfo->pTable);
|
||||||
|
|
||||||
|
STableKeyInfo info = {.pTable = pKeyInfo->pTable, .lastKey = skey};
|
||||||
|
taosArrayPush(sa, &info);
|
||||||
}
|
}
|
||||||
|
|
||||||
taosArrayPush(pTableGroup, &sa);
|
taosArrayPush(pTableGroup, &sa);
|
||||||
|
@ -2198,8 +2237,8 @@ SArray* createTableGroup(SArray* pTableList, STSchema* pTagSchema, SColIndex* pC
|
||||||
pSupp->pTagSchema = pTagSchema;
|
pSupp->pTagSchema = pTagSchema;
|
||||||
pSupp->pCols = pCols;
|
pSupp->pCols = pCols;
|
||||||
|
|
||||||
taosqsort(pTableList->pData, size, POINTER_BYTES, pSupp, tableGroupComparFn);
|
taosqsort(pTableList->pData, size, sizeof(STableKeyInfo), pSupp, tableGroupComparFn);
|
||||||
createTableGroupImpl(pTableGroup, pTableList, size, pSupp, tableGroupComparFn);
|
createTableGroupImpl(pTableGroup, pTableList, size, skey, pSupp, tableGroupComparFn);
|
||||||
taosTFree(pSupp);
|
taosTFree(pSupp);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2272,7 +2311,7 @@ static int32_t doQueryTableList(STable* pSTable, SArray* pRes, tExprNode* pExpr)
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tsdbQuerySTableByTagCond(TSDB_REPO_T* tsdb, uint64_t uid, const char* pTagCond, size_t len,
|
int32_t tsdbQuerySTableByTagCond(TSDB_REPO_T* tsdb, uint64_t uid, TSKEY skey, const char* pTagCond, size_t len,
|
||||||
int16_t tagNameRelType, const char* tbnameCond, STableGroupInfo* pGroupInfo,
|
int16_t tagNameRelType, const char* tbnameCond, STableGroupInfo* pGroupInfo,
|
||||||
SColIndex* pColIndex, int32_t numOfCols) {
|
SColIndex* pColIndex, int32_t numOfCols) {
|
||||||
if (tsdbRLockRepoMeta(tsdb) < 0) goto _error;
|
if (tsdbRLockRepoMeta(tsdb) < 0) goto _error;
|
||||||
|
@ -2296,7 +2335,7 @@ int32_t tsdbQuerySTableByTagCond(TSDB_REPO_T* tsdb, uint64_t uid, const char* pT
|
||||||
}
|
}
|
||||||
|
|
||||||
//NOTE: not add ref count for super table
|
//NOTE: not add ref count for super table
|
||||||
SArray* res = taosArrayInit(8, POINTER_BYTES);
|
SArray* res = taosArrayInit(8, sizeof(STableKeyInfo));
|
||||||
STSchema* pTagSchema = tsdbGetTableTagSchema(pTable);
|
STSchema* pTagSchema = tsdbGetTableTagSchema(pTable);
|
||||||
|
|
||||||
// no tags and tbname condition, all child tables of this stable are involved
|
// no tags and tbname condition, all child tables of this stable are involved
|
||||||
|
@ -2308,7 +2347,7 @@ int32_t tsdbQuerySTableByTagCond(TSDB_REPO_T* tsdb, uint64_t uid, const char* pT
|
||||||
}
|
}
|
||||||
|
|
||||||
pGroupInfo->numOfTables = taosArrayGetSize(res);
|
pGroupInfo->numOfTables = taosArrayGetSize(res);
|
||||||
pGroupInfo->pGroupList = createTableGroup(res, pTagSchema, pColIndex, numOfCols);
|
pGroupInfo->pGroupList = createTableGroup(res, pTagSchema, pColIndex, numOfCols, skey);
|
||||||
|
|
||||||
tsdbDebug("%p no table name/tag condition, all tables belong to one group, numOfTables:%zu", tsdb, pGroupInfo->numOfTables);
|
tsdbDebug("%p no table name/tag condition, all tables belong to one group, numOfTables:%zu", tsdb, pGroupInfo->numOfTables);
|
||||||
taosArrayDestroy(res);
|
taosArrayDestroy(res);
|
||||||
|
@ -2351,7 +2390,7 @@ int32_t tsdbQuerySTableByTagCond(TSDB_REPO_T* tsdb, uint64_t uid, const char* pT
|
||||||
|
|
||||||
doQueryTableList(pTable, res, expr);
|
doQueryTableList(pTable, res, expr);
|
||||||
pGroupInfo->numOfTables = taosArrayGetSize(res);
|
pGroupInfo->numOfTables = taosArrayGetSize(res);
|
||||||
pGroupInfo->pGroupList = createTableGroup(res, pTagSchema, pColIndex, numOfCols);
|
pGroupInfo->pGroupList = createTableGroup(res, pTagSchema, pColIndex, numOfCols, skey);
|
||||||
|
|
||||||
tsdbDebug("%p stable tid:%d, uid:%"PRIu64" query, numOfTables:%zu, belong to %zu groups", tsdb, pTable->tableId.tid,
|
tsdbDebug("%p stable tid:%d, uid:%"PRIu64" query, numOfTables:%zu, belong to %zu groups", tsdb, pTable->tableId.tid,
|
||||||
pTable->tableId.uid, pGroupInfo->numOfTables, taosArrayGetSize(pGroupInfo->pGroupList));
|
pTable->tableId.uid, pGroupInfo->numOfTables, taosArrayGetSize(pGroupInfo->pGroupList));
|
||||||
|
@ -2365,7 +2404,7 @@ int32_t tsdbQuerySTableByTagCond(TSDB_REPO_T* tsdb, uint64_t uid, const char* pT
|
||||||
return terrno;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tsdbGetOneTableGroup(TSDB_REPO_T* tsdb, uint64_t uid, STableGroupInfo* pGroupInfo) {
|
int32_t tsdbGetOneTableGroup(TSDB_REPO_T* tsdb, uint64_t uid, TSKEY startKey, STableGroupInfo* pGroupInfo) {
|
||||||
if (tsdbRLockRepoMeta(tsdb) < 0) goto _error;
|
if (tsdbRLockRepoMeta(tsdb) < 0) goto _error;
|
||||||
|
|
||||||
STable* pTable = tsdbGetTableByUid(tsdbGetMeta(tsdb), uid);
|
STable* pTable = tsdbGetTableByUid(tsdbGetMeta(tsdb), uid);
|
||||||
|
@ -2382,9 +2421,11 @@ int32_t tsdbGetOneTableGroup(TSDB_REPO_T* tsdb, uint64_t uid, STableGroupInfo* p
|
||||||
pGroupInfo->numOfTables = 1;
|
pGroupInfo->numOfTables = 1;
|
||||||
pGroupInfo->pGroupList = taosArrayInit(1, POINTER_BYTES);
|
pGroupInfo->pGroupList = taosArrayInit(1, POINTER_BYTES);
|
||||||
|
|
||||||
SArray* group = taosArrayInit(1, POINTER_BYTES);
|
SArray* group = taosArrayInit(1, sizeof(STableKeyInfo));
|
||||||
|
|
||||||
|
STableKeyInfo info = {.pTable = pTable, .lastKey = startKey};
|
||||||
|
taosArrayPush(group, &info);
|
||||||
|
|
||||||
taosArrayPush(group, &pTable);
|
|
||||||
taosArrayPush(pGroupInfo->pGroupList, &group);
|
taosArrayPush(pGroupInfo->pGroupList, &group);
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -2401,7 +2442,7 @@ int32_t tsdbGetTableGroupFromIdList(TSDB_REPO_T* tsdb, SArray* pTableIdList, STa
|
||||||
assert(pTableIdList != NULL);
|
assert(pTableIdList != NULL);
|
||||||
size_t size = taosArrayGetSize(pTableIdList);
|
size_t size = taosArrayGetSize(pTableIdList);
|
||||||
pGroupInfo->pGroupList = taosArrayInit(1, POINTER_BYTES);
|
pGroupInfo->pGroupList = taosArrayInit(1, POINTER_BYTES);
|
||||||
SArray* group = taosArrayInit(1, POINTER_BYTES);
|
SArray* group = taosArrayInit(1, sizeof(STableKeyInfo));
|
||||||
|
|
||||||
int32_t i = 0;
|
int32_t i = 0;
|
||||||
for(; i < size; ++i) {
|
for(; i < size; ++i) {
|
||||||
|
@ -2419,7 +2460,9 @@ int32_t tsdbGetTableGroupFromIdList(TSDB_REPO_T* tsdb, SArray* pTableIdList, STa
|
||||||
}
|
}
|
||||||
|
|
||||||
tsdbRefTable(pTable);
|
tsdbRefTable(pTable);
|
||||||
taosArrayPush(group, &pTable);
|
|
||||||
|
STableKeyInfo info = {.pTable = pTable, .lastKey = id->key};
|
||||||
|
taosArrayPush(group, &info);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (tsdbUnlockRepoMeta(tsdb) < 0) {
|
if (tsdbUnlockRepoMeta(tsdb) < 0) {
|
||||||
|
|
|
@ -104,8 +104,10 @@ static FORCE_INLINE void taosCacheReleaseNode(SCacheObj *pCacheObj, SCacheDataNo
|
||||||
|
|
||||||
pCacheObj->totalSize -= pNode->size;
|
pCacheObj->totalSize -= pNode->size;
|
||||||
int32_t size = taosHashGetSize(pCacheObj->pHashTable);
|
int32_t size = taosHashGetSize(pCacheObj->pHashTable);
|
||||||
uDebug("cache:%s, key:%p, %p is destroyed from cache, totalNum:%d totalSize:%" PRId64 "bytes size:%dbytes",
|
assert(size > 0);
|
||||||
pCacheObj->name, pNode->key, pNode->data, size, pCacheObj->totalSize, pNode->size);
|
|
||||||
|
uDebug("cache:%s, key:%p, %p is destroyed from cache, size:%dbytes, num:%d size:%" PRId64 "bytes",
|
||||||
|
pCacheObj->name, pNode->key, pNode->data, pNode->size, size - 1, pCacheObj->totalSize);
|
||||||
|
|
||||||
if (pCacheObj->freeFp) {
|
if (pCacheObj->freeFp) {
|
||||||
pCacheObj->freeFp(pNode->data);
|
pCacheObj->freeFp(pNode->data);
|
||||||
|
@ -428,7 +430,7 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) {
|
||||||
|
|
||||||
if (pCacheObj->extendLifespan && (!inTrashCan) && (!_remove)) {
|
if (pCacheObj->extendLifespan && (!inTrashCan) && (!_remove)) {
|
||||||
atomic_store_64(&pNode->expireTime, pNode->lifespan + taosGetTimestampMs());
|
atomic_store_64(&pNode->expireTime, pNode->lifespan + taosGetTimestampMs());
|
||||||
uDebug("cache:%s data:%p extend life time to %"PRId64 " before release", pCacheObj->name, pNode->data, pNode->expireTime);
|
uDebug("cache:%s data:%p extend expire time: %"PRId64, pCacheObj->name, pNode->data, pNode->expireTime);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (_remove) {
|
if (_remove) {
|
||||||
|
@ -471,9 +473,9 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) {
|
||||||
} else { // ref == 0
|
} else { // ref == 0
|
||||||
atomic_sub_fetch_64(&pCacheObj->totalSize, pNode->size);
|
atomic_sub_fetch_64(&pCacheObj->totalSize, pNode->size);
|
||||||
|
|
||||||
uDebug("cache:%s, key:%p, %p is destroyed from cache, totalNum:%d totalSize:%" PRId64 "bytes size:%dbytes",
|
int32_t size = taosHashGetSize(pCacheObj->pHashTable);
|
||||||
pCacheObj->name, pNode->key, pNode->data, (int32_t)taosHashGetSize(pCacheObj->pHashTable),
|
uDebug("cache:%s, key:%p, %p is destroyed from cache, size:%dbytes, num:%d size:%" PRId64 "bytes",
|
||||||
pCacheObj->totalSize, pNode->size);
|
pCacheObj->name, pNode->key, pNode->data, pNode->size, size, pCacheObj->totalSize);
|
||||||
|
|
||||||
if (pCacheObj->freeFp) {
|
if (pCacheObj->freeFp) {
|
||||||
pCacheObj->freeFp(pNode->data);
|
pCacheObj->freeFp(pNode->data);
|
||||||
|
@ -581,7 +583,8 @@ void taosAddToTrash(SCacheObj *pCacheObj, SCacheDataNode *pNode) {
|
||||||
pNode->pTNodeHeader = pElem;
|
pNode->pTNodeHeader = pElem;
|
||||||
pCacheObj->numOfElemsInTrash++;
|
pCacheObj->numOfElemsInTrash++;
|
||||||
|
|
||||||
uDebug("key:%p, %p move to trash, numOfElem in trash:%d", pNode->key, pNode->data, pCacheObj->numOfElemsInTrash);
|
uDebug("%s key:%p, %p move to trash, numOfElem in trash:%d", pCacheObj->name, pNode->key, pNode->data,
|
||||||
|
pCacheObj->numOfElemsInTrash);
|
||||||
}
|
}
|
||||||
|
|
||||||
void taosTrashCanEmpty(SCacheObj *pCacheObj, bool force) {
|
void taosTrashCanEmpty(SCacheObj *pCacheObj, bool force) {
|
||||||
|
@ -623,28 +626,13 @@ void taosTrashCanEmpty(SCacheObj *pCacheObj, bool force) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void doCleanupDataCache(SCacheObj *pCacheObj) {
|
void doCleanupDataCache(SCacheObj *pCacheObj) {
|
||||||
|
|
||||||
// SHashMutableIterator *pIter = taosHashCreateIter(pCacheObj->pHashTable);
|
|
||||||
// while (taosHashIterNext(pIter)) {
|
|
||||||
// SCacheDataNode *pNode = *(SCacheDataNode **)taosHashIterGet(pIter);
|
|
||||||
//
|
|
||||||
// int32_t c = T_REF_VAL_GET(pNode);
|
|
||||||
// if (c <= 0) {
|
|
||||||
// taosCacheReleaseNode(pCacheObj, pNode);
|
|
||||||
// } else {
|
|
||||||
// uDebug("cache:%s key:%p, %p will not remove from cache, refcnt:%d", pCacheObj->name, pNode->key,
|
|
||||||
// pNode->data, T_REF_VAL_GET(pNode));
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// taosHashDestroyIter(pIter);
|
|
||||||
|
|
||||||
SHashTravSupp sup = {.pCacheObj = pCacheObj, .fp = NULL, .time = taosGetTimestampMs()};
|
SHashTravSupp sup = {.pCacheObj = pCacheObj, .fp = NULL, .time = taosGetTimestampMs()};
|
||||||
taosHashCondTraverse(pCacheObj->pHashTable, travHashTableEmptyFn, &sup);
|
taosHashCondTraverse(pCacheObj->pHashTable, travHashTableEmptyFn, &sup);
|
||||||
|
|
||||||
// todo memory leak if there are object with refcount greater than 0 in hash table?
|
// todo memory leak if there are object with refcount greater than 0 in hash table?
|
||||||
taosHashCleanup(pCacheObj->pHashTable);
|
taosHashCleanup(pCacheObj->pHashTable);
|
||||||
taosTrashCanEmpty(pCacheObj, true);
|
taosTrashCanEmpty(pCacheObj, true);
|
||||||
|
|
||||||
__cache_lock_destroy(pCacheObj);
|
__cache_lock_destroy(pCacheObj);
|
||||||
|
|
||||||
taosTFree(pCacheObj->name);
|
taosTFree(pCacheObj->name);
|
||||||
|
|
|
@ -66,9 +66,19 @@ if $row != 100 then
|
||||||
return -1
|
return -1
|
||||||
endi
|
endi
|
||||||
|
|
||||||
sql select last(c2) from tb_tb9
|
sql select last(*) from tb_tb9
|
||||||
if $row != 1 then
|
if $row != 1 then
|
||||||
return -1
|
return -1
|
||||||
endi
|
endi
|
||||||
|
|
||||||
|
sql select last(c2) from tb_tb9
|
||||||
|
if $row != 0 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
sql select first(c2), last(c2) from tb_tb9
|
||||||
|
if $row != 0 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
Loading…
Reference in New Issue