more work

This commit is contained in:
Hongze Cheng 2022-06-20 02:38:08 +00:00
parent 0767b57e59
commit 21ea4e3a0b
6 changed files with 87 additions and 95 deletions

View File

@ -70,6 +70,10 @@ int32_t vnodeSnapshotReaderOpen(SVnode *pVnode, SVSnapshotReader **ppReader, int
int32_t vnodeSnapshotReaderClose(SVSnapshotReader *pReader); int32_t vnodeSnapshotReaderClose(SVSnapshotReader *pReader);
int32_t vnodeSnapshotRead(SVSnapshotReader *pReader, const void **ppData, uint32_t *nData); int32_t vnodeSnapshotRead(SVSnapshotReader *pReader, const void **ppData, uint32_t *nData);
int32_t vnodeProcessCreateTSma(SVnode *pVnode, void *pCont, uint32_t contLen); int32_t vnodeProcessCreateTSma(SVnode *pVnode, void *pCont, uint32_t contLen);
int32_t vnodeGetAllTableList(SVnode *pVnode, uint64_t uid, SArray *list);
int32_t vnodeGetCtbIdList(SVnode *pVnode, int64_t suid, SArray *list);
void *vnodeGetIdx(SVnode *pVnode);
void *vnodeGetIvtIdx(SVnode *pVnode);
void vnodeProposeMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs); void vnodeProposeMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs);
void vnodeApplyMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs); void vnodeApplyMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs);
@ -127,11 +131,6 @@ void tsdbResetReadHandle(STsdbReader *pReader, SQueryTableDataCond *pCond, in
int32_t tsdbGetFileBlocksDistInfo(STsdbReader *pReader, STableBlockDistInfo *pTableBlockInfo); int32_t tsdbGetFileBlocksDistInfo(STsdbReader *pReader, STableBlockDistInfo *pTableBlockInfo);
int64_t tsdbGetNumOfRowsInMemTable(STsdbReader *pHandle); int64_t tsdbGetNumOfRowsInMemTable(STsdbReader *pHandle);
int32_t tsdbGetAllTableList(SMeta *pMeta, uint64_t uid, SArray *list);
int32_t tsdbGetCtbIdList(SMeta *pMeta, int64_t suid, SArray *list);
void *tsdbGetIdx(SMeta *pMeta);
void *tsdbGetIvtIdx(SMeta *pMeta);
// tq // tq
typedef struct STqReadHandle STqReadHandle; typedef struct STqReadHandle STqReadHandle;

View File

@ -345,7 +345,7 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
} else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) { } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
pHandle->execHandle.execTb.suid = req.suid; pHandle->execHandle.execTb.suid = req.suid;
SArray* tbUidList = taosArrayInit(0, sizeof(int64_t)); SArray* tbUidList = taosArrayInit(0, sizeof(int64_t));
tsdbGetCtbIdList(pTq->pVnode->pMeta, req.suid, tbUidList); vnodeGetCtbIdList(pTq->pVnode, req.suid, tbUidList);
tqDebug("vg %d, tq try get suid: %ld", pTq->pVnode->config.vgId, req.suid); tqDebug("vg %d, tq try get suid: %ld", pTq->pVnode->config.vgId, req.suid);
for (int32_t i = 0; i < taosArrayGetSize(tbUidList); i++) { for (int32_t i = 0; i < taosArrayGetSize(tbUidList); i++) {
int64_t tbUid = *(int64_t*)taosArrayGet(tbUidList, i); int64_t tbUid = *(int64_t*)taosArrayGet(tbUidList, i);

View File

@ -25,8 +25,6 @@
#define SL_MOVE_BACKWARD 0x1 #define SL_MOVE_BACKWARD 0x1
#define SL_MOVE_FROM_POS 0x2 #define SL_MOVE_FROM_POS 0x2
static int32_t tPutTSDBRow(uint8_t *p, TSDBROW *pRow);
static int32_t tGetTSDBRow(uint8_t *p, TSDBROW *pRow);
static void tbDataMovePosTo(STbData *pTbData, SMemSkipListNode **pos, TSDBKEY *pKey, int32_t flags); static void tbDataMovePosTo(STbData *pTbData, SMemSkipListNode **pos, TSDBKEY *pKey, int32_t flags);
static int32_t tsdbGetOrCreateTbData(SMemTable *pMemTable, tb_uid_t suid, tb_uid_t uid, STbData **ppTbData); static int32_t tsdbGetOrCreateTbData(SMemTable *pMemTable, tb_uid_t suid, tb_uid_t uid, STbData **ppTbData);
static int32_t tsdbInsertTableDataImpl(SMemTable *pMemTable, STbData *pTbData, int64_t version, static int32_t tsdbInsertTableDataImpl(SMemTable *pMemTable, STbData *pTbData, int64_t version,

View File

@ -2375,51 +2375,6 @@ _err:
// return numOfRows; // return numOfRows;
// } // }
// void* tsdbGetIdx(SMeta* pMeta) {
// if (pMeta == NULL) {
// return NULL;
// }
// return metaGetIdx(pMeta);
// }
// void* tsdbGetIvtIdx(SMeta* pMeta) {
// if (pMeta == NULL) {
// return NULL;
// }
// return metaGetIvtIdx(pMeta);
// }
// int32_t tsdbGetAllTableList(SMeta* pMeta, uint64_t uid, SArray* list) {
// SMCtbCursor* pCur = metaOpenCtbCursor(pMeta, uid);
// while (1) {
// tb_uid_t id = metaCtbCursorNext(pCur);
// if (id == 0) {
// break;
// }
// STableKeyInfo info = {.lastKey = TSKEY_INITIAL_VAL, uid = id};
// taosArrayPush(list, &info);
// }
// metaCloseCtbCursor(pCur);
// return TSDB_CODE_SUCCESS;
// }
// int32_t tsdbGetCtbIdList(SMeta* pMeta, int64_t suid, SArray* list) {
// SMCtbCursor* pCur = metaOpenCtbCursor(pMeta, suid);
// while (1) {
// tb_uid_t id = metaCtbCursorNext(pCur);
// if (id == 0) {
// break;
// }
// taosArrayPush(list, &id);
// }
// metaCloseCtbCursor(pCur);
// return TSDB_CODE_SUCCESS;
// }
// static void destroyHelper(void* param) { // static void destroyHelper(void* param) {
// if (param == NULL) { // if (param == NULL) {
// return; // return;

View File

@ -148,4 +148,51 @@ void vnodeGetInfo(SVnode *pVnode, const char **dbname, int32_t *vgId) {
if (vgId) { if (vgId) {
*vgId = TD_VID(pVnode); *vgId = TD_VID(pVnode);
} }
} }
int32_t vnodeGetAllTableList(SVnode *pVnode, uint64_t uid, SArray *list) {
SMCtbCursor *pCur = metaOpenCtbCursor(pVnode->pMeta, uid);
while (1) {
tb_uid_t id = metaCtbCursorNext(pCur);
if (id == 0) {
break;
}
STableKeyInfo info = {.lastKey = TSKEY_INITIAL_VAL, uid = id};
taosArrayPush(list, &info);
}
metaCloseCtbCursor(pCur);
return TSDB_CODE_SUCCESS;
}
int32_t vnodeGetCtbIdList(SVnode *pVnode, int64_t suid, SArray *list) {
SMCtbCursor *pCur = metaOpenCtbCursor(pVnode->pMeta, suid);
while (1) {
tb_uid_t id = metaCtbCursorNext(pCur);
if (id == 0) {
break;
}
taosArrayPush(list, &id);
}
metaCloseCtbCursor(pCur);
return TSDB_CODE_SUCCESS;
}
void *vnodeGetIdx(SVnode *pVnode) {
if (pVnode == NULL) {
return NULL;
}
return metaGetIdx(pVnode->pMeta);
}
void *vnodeGetIvtIdx(SVnode *pVnode) {
if (pVnode == NULL) {
return NULL;
}
return metaGetIvtIdx(pVnode->pMeta);
}

View File

@ -13,10 +13,10 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "os.h"
#include "index.h"
#include "function.h" #include "function.h"
#include "functionMgt.h" #include "functionMgt.h"
#include "index.h"
#include "os.h"
#include "tdatablock.h" #include "tdatablock.h"
#include "thash.h" #include "thash.h"
#include "tmsg.h" #include "tmsg.h"
@ -25,45 +25,41 @@
#include "executorimpl.h" #include "executorimpl.h"
#include "tcompression.h" #include "tcompression.h"
void initResultRowInfo(SResultRowInfo *pResultRowInfo) { void initResultRowInfo(SResultRowInfo* pResultRowInfo) {
pResultRowInfo->size = 0; pResultRowInfo->size = 0;
pResultRowInfo->cur.pageId = -1; pResultRowInfo->cur.pageId = -1;
} }
void cleanupResultRowInfo(SResultRowInfo *pResultRowInfo) { void cleanupResultRowInfo(SResultRowInfo* pResultRowInfo) {
if (pResultRowInfo == NULL) { if (pResultRowInfo == NULL) {
return; return;
} }
for(int32_t i = 0; i < pResultRowInfo->size; ++i) { for (int32_t i = 0; i < pResultRowInfo->size; ++i) {
// if (pResultRowInfo->pResult[i]) { // if (pResultRowInfo->pResult[i]) {
// taosMemoryFreeClear(pResultRowInfo->pResult[i]->key); // taosMemoryFreeClear(pResultRowInfo->pResult[i]->key);
// } // }
} }
} }
void closeAllResultRows(SResultRowInfo *pResultRowInfo) { void closeAllResultRows(SResultRowInfo* pResultRowInfo) {
// do nothing // do nothing
} }
bool isResultRowClosed(SResultRow* pRow) { bool isResultRowClosed(SResultRow* pRow) { return (pRow->closed == true); }
return (pRow->closed == true);
}
void closeResultRow(SResultRow* pResultRow) { void closeResultRow(SResultRow* pResultRow) { pResultRow->closed = true; }
pResultRow->closed = true;
}
// TODO refactor: use macro // TODO refactor: use macro
SResultRowEntryInfo* getResultEntryInfo(const SResultRow* pRow, int32_t index, const int32_t* offset) { SResultRowEntryInfo* getResultEntryInfo(const SResultRow* pRow, int32_t index, const int32_t* offset) {
assert(index >= 0 && offset != NULL); assert(index >= 0 && offset != NULL);
return (SResultRowEntryInfo*)((char*) pRow->pEntryInfo + offset[index]); return (SResultRowEntryInfo*)((char*)pRow->pEntryInfo + offset[index]);
} }
size_t getResultRowSize(SqlFunctionCtx* pCtx, int32_t numOfOutput) { size_t getResultRowSize(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
int32_t rowSize = (numOfOutput * sizeof(SResultRowEntryInfo)) + sizeof(SResultRow); int32_t rowSize = (numOfOutput * sizeof(SResultRowEntryInfo)) + sizeof(SResultRow);
for(int32_t i = 0; i < numOfOutput; ++i) { for (int32_t i = 0; i < numOfOutput; ++i) {
rowSize += pCtx[i].resDataInfo.interBufSize; rowSize += pCtx[i].resDataInfo.interBufSize;
} }
@ -74,31 +70,29 @@ void cleanupGroupResInfo(SGroupResInfo* pGroupResInfo) {
assert(pGroupResInfo != NULL); assert(pGroupResInfo != NULL);
taosArrayDestroy(pGroupResInfo->pRows); taosArrayDestroy(pGroupResInfo->pRows);
pGroupResInfo->pRows = NULL; pGroupResInfo->pRows = NULL;
pGroupResInfo->index = 0; pGroupResInfo->index = 0;
} }
static int32_t resultrowComparAsc(const void* p1, const void* p2) { static int32_t resultrowComparAsc(const void* p1, const void* p2) {
SResKeyPos* pp1 = *(SResKeyPos**) p1; SResKeyPos* pp1 = *(SResKeyPos**)p1;
SResKeyPos* pp2 = *(SResKeyPos**) p2; SResKeyPos* pp2 = *(SResKeyPos**)p2;
if (pp1->groupId == pp2->groupId) { if (pp1->groupId == pp2->groupId) {
int64_t pts1 = *(int64_t*) pp1->key; int64_t pts1 = *(int64_t*)pp1->key;
int64_t pts2 = *(int64_t*) pp2->key; int64_t pts2 = *(int64_t*)pp2->key;
if (pts1 == pts2) { if (pts1 == pts2) {
return 0; return 0;
} else { } else {
return pts1 < pts2? -1:1; return pts1 < pts2 ? -1 : 1;
} }
} else { } else {
return pp1->groupId < pp2->groupId? -1:1; return pp1->groupId < pp2->groupId ? -1 : 1;
} }
} }
static int32_t resultrowComparDesc(const void* p1, const void* p2) { static int32_t resultrowComparDesc(const void* p1, const void* p2) { return resultrowComparAsc(p2, p1); }
return resultrowComparAsc(p2, p1);
}
void initGroupedResultInfo(SGroupResInfo* pGroupResInfo, SHashObj* pHashmap, int32_t order) { void initGroupedResultInfo(SGroupResInfo* pGroupResInfo, SHashObj* pHashmap, int32_t order) {
if (pGroupResInfo->pRows != NULL) { if (pGroupResInfo->pRows != NULL) {
@ -110,20 +104,20 @@ void initGroupedResultInfo(SGroupResInfo* pGroupResInfo, SHashObj* pHashmap, int
pGroupResInfo->pRows = taosArrayInit(10, POINTER_BYTES); pGroupResInfo->pRows = taosArrayInit(10, POINTER_BYTES);
size_t keyLen = 0; size_t keyLen = 0;
while((pData = taosHashIterate(pHashmap, pData)) != NULL) { while ((pData = taosHashIterate(pHashmap, pData)) != NULL) {
void* key = taosHashGetKey(pData, &keyLen); void* key = taosHashGetKey(pData, &keyLen);
SResKeyPos* p = taosMemoryMalloc(keyLen + sizeof(SResultRowPosition)); SResKeyPos* p = taosMemoryMalloc(keyLen + sizeof(SResultRowPosition));
p->groupId = *(uint64_t*) key; p->groupId = *(uint64_t*)key;
p->pos = *(SResultRowPosition*) pData; p->pos = *(SResultRowPosition*)pData;
memcpy(p->key, (char*)key + sizeof(uint64_t), keyLen - sizeof(uint64_t)); memcpy(p->key, (char*)key + sizeof(uint64_t), keyLen - sizeof(uint64_t));
taosArrayPush(pGroupResInfo->pRows, &p); taosArrayPush(pGroupResInfo->pRows, &p);
} }
if (order == TSDB_ORDER_ASC || order == TSDB_ORDER_DESC) { if (order == TSDB_ORDER_ASC || order == TSDB_ORDER_DESC) {
__compar_fn_t fn = (order == TSDB_ORDER_ASC)? resultrowComparAsc:resultrowComparDesc; __compar_fn_t fn = (order == TSDB_ORDER_ASC) ? resultrowComparAsc : resultrowComparDesc;
qsort(pGroupResInfo->pRows->pData, taosArrayGetSize(pGroupResInfo->pRows), POINTER_BYTES, fn); qsort(pGroupResInfo->pRows->pData, taosArrayGetSize(pGroupResInfo->pRows), POINTER_BYTES, fn);
} }
@ -155,7 +149,7 @@ int32_t getNumOfTotalRes(SGroupResInfo* pGroupResInfo) {
return 0; return 0;
} }
return (int32_t) taosArrayGetSize(pGroupResInfo->pRows); return (int32_t)taosArrayGetSize(pGroupResInfo->pRows);
} }
SArray* createSortInfo(SNodeList* pNodeList) { SArray* createSortInfo(SNodeList* pNodeList) {
@ -223,7 +217,7 @@ int32_t getTableList(void* metaHandle, SScanPhysiNode* pScanNode, STableListInfo
if (pScanNode->tableType == TSDB_SUPER_TABLE) { if (pScanNode->tableType == TSDB_SUPER_TABLE) {
if (pTagCond) { if (pTagCond) {
SIndexMetaArg metaArg = { SIndexMetaArg metaArg = {
.metaEx = metaHandle, .idx = tsdbGetIdx(metaHandle), .ivtIdx = tsdbGetIvtIdx(metaHandle), .suid = tableUid}; .metaEx = metaHandle, .idx = vnodeGetIdx(metaHandle), .ivtIdx = vnodeGetIvtIdx(metaHandle), .suid = tableUid};
SArray* res = taosArrayInit(8, sizeof(uint64_t)); SArray* res = taosArrayInit(8, sizeof(uint64_t));
code = doFilterTag(pTagCond, &metaArg, res); code = doFilterTag(pTagCond, &metaArg, res);
@ -244,7 +238,7 @@ int32_t getTableList(void* metaHandle, SScanPhysiNode* pScanNode, STableListInfo
} }
taosArrayDestroy(res); taosArrayDestroy(res);
} else { } else {
code = tsdbGetAllTableList(metaHandle, tableUid, pListInfo->pTableList); code = vnodeGetAllTableList(metaHandle, tableUid, pListInfo->pTableList);
} }
} else { // Create one table group. } else { // Create one table group.
STableKeyInfo info = {.lastKey = 0, .uid = tableUid}; STableKeyInfo info = {.lastKey = 0, .uid = tableUid};
@ -255,7 +249,7 @@ int32_t getTableList(void* metaHandle, SScanPhysiNode* pScanNode, STableListInfo
} }
SArray* extractPartitionColInfo(SNodeList* pNodeList) { SArray* extractPartitionColInfo(SNodeList* pNodeList) {
if(!pNodeList) { if (!pNodeList) {
return NULL; return NULL;
} }
@ -284,7 +278,6 @@ SArray* extractPartitionColInfo(SNodeList* pNodeList) {
return pList; return pList;
} }
SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNodeList, int32_t* numOfOutputCols, SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNodeList, int32_t* numOfOutputCols,
int32_t type) { int32_t type) {
size_t numOfCols = LIST_LENGTH(pNodeList); size_t numOfCols = LIST_LENGTH(pNodeList);
@ -569,8 +562,8 @@ SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput,
} }
for (int32_t i = 1; i < numOfOutput; ++i) { for (int32_t i = 1; i < numOfOutput; ++i) {
(*rowEntryInfoOffset)[i] = (*rowEntryInfoOffset)[i] = (int32_t)((*rowEntryInfoOffset)[i - 1] + sizeof(SResultRowEntryInfo) +
(int32_t)((*rowEntryInfoOffset)[i - 1] + sizeof(SResultRowEntryInfo) + pFuncCtx[i - 1].resDataInfo.interBufSize); pFuncCtx[i - 1].resDataInfo.interBufSize);
} }
setSelectValueColumnInfo(pFuncCtx, numOfOutput); setSelectValueColumnInfo(pFuncCtx, numOfOutput);