diff --git a/cmake/bdb_CMakeLists.txt.in b/cmake/bdb_CMakeLists.txt.in.bak similarity index 100% rename from cmake/bdb_CMakeLists.txt.in rename to cmake/bdb_CMakeLists.txt.in.bak diff --git a/cmake/cmake.options b/cmake/cmake.options index 946eb5d258..b51f096185 100644 --- a/cmake/cmake.options +++ b/cmake/cmake.options @@ -55,17 +55,6 @@ option( OFF ) -IF(${TD_WINDOWS}) - MESSAGE("Not build BDB on Windows") -ELSE () - option( - BUILD_WITH_BDB - "If build with BerkleyDB" - ON - ) - -ENDIF () - option( BUILD_WITH_LUCENE "If build with lucene" diff --git a/contrib/CMakeLists.txt b/contrib/CMakeLists.txt index 575a0e6274..377e3fbccc 100644 --- a/contrib/CMakeLists.txt +++ b/contrib/CMakeLists.txt @@ -63,9 +63,9 @@ if(${BUILD_WITH_UV}) endif(${BUILD_WITH_UV}) # bdb -if(${BUILD_WITH_BDB}) - cat("${CMAKE_SUPPORT_DIR}/bdb_CMakeLists.txt.in" ${CONTRIB_TMP_FILE}) -endif(${BUILD_WITH_BDB}) +#if(${BUILD_WITH_BDB}) + #cat("${CMAKE_SUPPORT_DIR}/bdb_CMakeLists.txt.in" ${CONTRIB_TMP_FILE}) +#endif(${BUILD_WITH_BDB}) # sqlite if(${BUILD_WITH_SQLITE}) diff --git a/contrib/test/CMakeLists.txt b/contrib/test/CMakeLists.txt index eacaeb9524..740488b39b 100644 --- a/contrib/test/CMakeLists.txt +++ b/contrib/test/CMakeLists.txt @@ -7,9 +7,9 @@ if(${BUILD_WITH_LUCENE}) add_subdirectory(lucene) endif(${BUILD_WITH_LUCENE}) -if(${BUILD_WITH_BDB}) - add_subdirectory(bdb) -endif(${BUILD_WITH_BDB}) +#if(${BUILD_WITH_BDB}) + #add_subdirectory(bdb) +#endif(${BUILD_WITH_BDB}) if(${BUILD_WITH_SQLITE}) add_subdirectory(sqlite) diff --git a/include/common/tcommon.h b/include/common/tcommon.h index f2fec58d8f..bf6a804e7f 100644 --- a/include/common/tcommon.h +++ b/include/common/tcommon.h @@ -71,7 +71,7 @@ typedef struct SDataBlockInfo { int64_t uid; int64_t blockId; }; - int64_t groupId; // no need to serialize + uint64_t groupId; // no need to serialize } SDataBlockInfo; typedef struct SSDataBlock { diff --git a/include/common/tdatablock.h b/include/common/tdatablock.h index 7978430027..98edd9f0a5 100644 --- a/include/common/tdatablock.h +++ b/include/common/tdatablock.h @@ -182,6 +182,7 @@ int32_t blockDataSplitRows(SSDataBlock* pBlock, bool hasVarCol, int32_t startInd int32_t pageSize); int32_t blockDataToBuf(char* buf, const SSDataBlock* pBlock); int32_t blockDataFromBuf(SSDataBlock* pBlock, const char* buf); +int32_t blockDataFromBuf1(SSDataBlock* pBlock, const char* buf, size_t capacity); SSDataBlock* blockDataExtractBlock(SSDataBlock* pBlock, int32_t startIndex, int32_t rowCount); diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 1b4c6ea6df..c7251a26b9 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -512,6 +512,7 @@ typedef struct { int32_t maxRows; int32_t commitTime; int32_t fsyncPeriod; + int32_t ttl; int8_t walLevel; int8_t precision; // time resolution int8_t compression; @@ -521,6 +522,7 @@ typedef struct { int8_t cacheLastRow; int8_t ignoreExist; int8_t streamMode; + int8_t singleSTable; int32_t numOfRetensions; SArray* pRetensions; // SRetention } SCreateDbReq; @@ -586,6 +588,41 @@ int32_t tSerializeSUseDbRspImp(SCoder* pEncoder, const SUseDbRsp* pRsp); int32_t tDeserializeSUseDbRspImp(SCoder* pDecoder, SUseDbRsp* pRsp); void tFreeSUsedbRsp(SUseDbRsp* pRsp); +typedef struct { + char db[TSDB_DB_FNAME_LEN]; +} SDbCfgReq; + +int32_t tSerializeSDbCfgReq(void* buf, int32_t bufLen, SDbCfgReq* pReq); +int32_t tDeserializeSDbCfgReq(void* buf, int32_t bufLen, SDbCfgReq* pReq); + +typedef struct { + int32_t numOfVgroups; + int32_t cacheBlockSize; + int32_t totalBlocks; + int32_t daysPerFile; + int32_t daysToKeep0; + int32_t daysToKeep1; + int32_t daysToKeep2; + int32_t minRows; + int32_t maxRows; + int32_t commitTime; + int32_t fsyncPeriod; + int32_t ttl; + int8_t walLevel; + int8_t precision; + int8_t compression; + int8_t replications; + int8_t quorum; + int8_t update; + int8_t cacheLastRow; + int8_t streamMode; + int8_t singleSTable; +} SDbCfgRsp; + +int32_t tSerializeSDbCfgRsp(void* buf, int32_t bufLen, const SDbCfgRsp* pRsp); +int32_t tDeserializeSDbCfgRsp(void* buf, int32_t bufLen, SDbCfgRsp* pRsp); + + typedef struct { int32_t rowNum; } SQnodeListReq; @@ -2285,6 +2322,7 @@ typedef struct { char cgroup[TSDB_CGROUP_LEN]; int64_t currentOffset; + uint64_t reqId; char topic[TSDB_TOPIC_FNAME_LEN]; } SMqPollReq; diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index 3aecca40b2..6010c80f31 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -155,6 +155,7 @@ enum { TD_DEF_MSG_TYPE(TDMT_MND_CREATE_STREAM, "mnode-create-stream", SCMCreateStreamReq, SCMCreateStreamRsp) TD_DEF_MSG_TYPE(TDMT_MND_ALTER_STREAM, "mnode-alter-stream", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_DROP_STREAM, "mnode-drop-stream", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_GET_DB_CFG, "mnode-get-db-cfg", NULL, NULL) // Requests handled by VNODE TD_NEW_MSG_SEG(TDMT_VND_MSG) diff --git a/include/libs/catalog/catalog.h b/include/libs/catalog/catalog.h index 9f0d4b11c2..a0b342fca2 100644 --- a/include/libs/catalog/catalog.h +++ b/include/libs/catalog/catalog.h @@ -77,6 +77,8 @@ typedef struct SDbVgVersion { int32_t numOfTable; // unit is TSDB_TABLE_NUM_UNIT } SDbVgVersion; +typedef SDbCfgRsp SDbCfgInfo; + int32_t catalogInit(SCatalogCfg *cfg); /** @@ -217,6 +219,8 @@ int32_t catalogGetExpiredSTables(SCatalog* pCatalog, SSTableMetaVersion **stable int32_t catalogGetExpiredDBs(SCatalog* pCatalog, SDbVgVersion **dbs, uint32_t *num); +int32_t catalogGetDBCfg(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* dbFName, SDbCfgInfo* pDbCfg); + /** * Destroy catalog and relase all resources diff --git a/include/libs/nodes/nodes.h b/include/libs/nodes/nodes.h index 28e5483552..39448ceef3 100644 --- a/include/libs/nodes/nodes.h +++ b/include/libs/nodes/nodes.h @@ -30,11 +30,19 @@ extern "C" { #define FOREACH(node, list) \ for (SListCell* cell = (NULL != (list) ? (list)->pHead : NULL); (NULL != cell ? (node = cell->pNode, true) : (node = NULL, false)); cell = cell->pNext) -// only be use in FOREACH -#define ERASE_NODE(list) cell = nodesListErase(list, cell); - #define REPLACE_NODE(newNode) cell->pNode = (SNode*)(newNode) +#define INSERT_LIST(target, src) nodesListInsertList((target), cell, src) + +#define WHERE_EACH(node, list) \ + SListCell* cell = (NULL != (list) ? (list)->pHead : NULL); \ + while (NULL != cell ? (node = cell->pNode, true) : (node = NULL, false)) + +#define WHERE_NEXT cell = cell->pNext + +// only be use in WHERE_EACH +#define ERASE_NODE(list) cell = nodesListErase((list), cell) + #define FORBOTH(node1, list1, node2, list2) \ for (SListCell* cell1 = (NULL != (list1) ? (list1)->pHead : NULL), *cell2 = (NULL != (list2) ? (list2)->pHead : NULL); \ (NULL == cell1 ? (node1 = NULL, false) : (node1 = cell1->pNode, true)), (NULL == cell2 ? (node2 = NULL, false) : (node2 = cell2->pNode, true)), (node1 != NULL && node2 != NULL); \ @@ -202,7 +210,9 @@ int32_t nodesListStrictAppend(SNodeList* pList, SNodeptr pNode); int32_t nodesListMakeAppend(SNodeList** pList, SNodeptr pNode); int32_t nodesListAppendList(SNodeList* pTarget, SNodeList* pSrc); int32_t nodesListStrictAppendList(SNodeList* pTarget, SNodeList* pSrc); +int32_t nodesListPushFront(SNodeList* pList, SNodeptr pNode); SListCell* nodesListErase(SNodeList* pList, SListCell* pCell); +void nodesListInsertList(SNodeList* pTarget, SListCell* pPos, SNodeList* pSrc); SNodeptr nodesListGetNode(SNodeList* pList, int32_t index); void nodesDestroyList(SNodeList* pList); // Only clear the linked list structure, without releasing the elements inside diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index a6e5fee2d1..5893a14bd5 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -1149,6 +1149,7 @@ SMqPollReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t blockingTime, SMqClientTo pReq->consumerId = tmq->consumerId; pReq->epoch = tmq->epoch; pReq->currentOffset = reqOffset; + pReq->reqId = generateRequestId(); pReq->head.vgId = htonl(pVg->vgId); pReq->head.contLen = htonl(sizeof(SMqPollReq)); @@ -1279,7 +1280,7 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) { .len = sizeof(SMqPollReq), .handle = NULL, }; - sendInfo->requestId = generateRequestId(); + sendInfo->requestId = pReq->reqId; sendInfo->requestObjRefId = 0; sendInfo->param = pParam; sendInfo->fp = tmqPollCb; @@ -1288,7 +1289,7 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) { int64_t transporterId = 0; /*printf("send poll\n");*/ atomic_add_fetch_32(&tmq->waitingRequest, 1); - tscDebug("consumer %ld send poll: vg %d, epoch %d, req offset %ld", tmq->consumerId, pVg->vgId, tmq->epoch, pVg->currentOffset); + tscDebug("consumer %ld send poll to %s : vg %d, epoch %d, req offset %ld, reqId %lu", tmq->consumerId, pTopic->topicName, pVg->vgId, tmq->epoch, pVg->currentOffset, pReq->reqId); /*printf("send vg %d %ld\n", pVg->vgId, pVg->currentOffset);*/ asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo); pVg->pollCnt++; diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index fb6640e59e..8f55488e2c 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -415,10 +415,7 @@ int32_t blockDataSplitRows(SSDataBlock* pBlock, bool hasVarCol, int32_t startInd for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData* pColInfoData = TARRAY_GET_ELEM(pBlock->pDataBlock, i); if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) { - bool isNull = colDataIsNull(pColInfoData, numOfRows, j, NULL); - if (isNull) { - // do nothing - } else { + if (pColInfoData->varmeta.offset[j] != -1) { char* p = colDataGetData(pColInfoData, j); size += varDataTLen(p); } @@ -547,8 +544,8 @@ int32_t blockDataFromBuf(SSDataBlock* pBlock, const char* buf) { for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, i); - size_t metaSize = pBlock->info.rows * sizeof(int32_t); if (IS_VAR_DATA_TYPE(pCol->info.type)) { + size_t metaSize = pBlock->info.rows * sizeof(int32_t); memcpy(pCol->varmeta.offset, pStart, metaSize); pStart += metaSize; } else { @@ -581,6 +578,49 @@ int32_t blockDataFromBuf(SSDataBlock* pBlock, const char* buf) { return TSDB_CODE_SUCCESS; } +int32_t blockDataFromBuf1(SSDataBlock* pBlock, const char* buf, size_t capacity) { + pBlock->info.rows = *(int32_t*)buf; + + int32_t numOfCols = pBlock->info.numOfCols; + const char* pStart = buf + sizeof(uint32_t); + + for (int32_t i = 0; i < numOfCols; ++i) { + SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, i); + + if (IS_VAR_DATA_TYPE(pCol->info.type)) { + size_t metaSize = capacity * sizeof(int32_t); + memcpy(pCol->varmeta.offset, pStart, metaSize); + pStart += metaSize; + } else { + memcpy(pCol->nullbitmap, pStart, BitmapLen(capacity)); + pStart += BitmapLen(capacity); + } + + int32_t colLength = *(int32_t*)pStart; + pStart += sizeof(int32_t); + + if (IS_VAR_DATA_TYPE(pCol->info.type)) { + if (pCol->varmeta.allocLen < colLength) { + char* tmp = taosMemoryRealloc(pCol->pData, colLength); + if (tmp == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + pCol->pData = tmp; + pCol->varmeta.allocLen = colLength; + } + + pCol->varmeta.length = colLength; + ASSERT(pCol->varmeta.length <= pCol->varmeta.allocLen); + } + + memcpy(pCol->pData, pStart, colLength); + pStart += pCol->info.bytes * capacity; + } + + return TSDB_CODE_SUCCESS; +} + size_t blockDataGetRowSize(SSDataBlock* pBlock) { ASSERT(pBlock != NULL); if (pBlock->info.rowSize == 0) { @@ -627,6 +667,10 @@ double blockDataGetSerialRowSize(const SSDataBlock* pBlock) { return rowSize; } +int32_t getAllowedRowsForPage(const SSDataBlock* pBlock, size_t pgSize) { + return (int32_t) ((pgSize - blockDataGetSerialMetaSize(pBlock))/ blockDataGetSerialRowSize(pBlock)); +} + typedef struct SSDataBlockSortHelper { SArray* orderInfo; // SArray SSDataBlock* pDataBlock; @@ -1178,6 +1222,9 @@ int32_t colInfoDataEnsureCapacity(SColumnInfoData* pColumn, uint32_t numOfRows) int32_t blockDataEnsureCapacity(SSDataBlock* pDataBlock, uint32_t numOfRows) { int32_t code = 0; + if (numOfRows == 0) { + return TSDB_CODE_SUCCESS; + } for (int32_t i = 0; i < pDataBlock->info.numOfCols; ++i) { SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, i); @@ -1220,7 +1267,7 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock) { } size_t blockDataGetCapacityInRow(const SSDataBlock* pBlock, size_t pageSize) { - return pageSize / (blockDataGetSerialRowSize(pBlock) + blockDataGetSerialMetaSize(pBlock)); + return (int32_t) ((pageSize - blockDataGetSerialMetaSize(pBlock))/ blockDataGetSerialRowSize(pBlock)); } void colDataDestroy(SColumnInfoData* pColData) { diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 4ecacb9d9d..b351536839 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -394,7 +394,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { if (cfgAddInt32(pCfg, "numOfMnodeReadThreads", tsNumOfMnodeReadThreads, 1, 1024, 0) != 0) return -1; tsNumOfVnodeQueryThreads = tsNumOfCores / 2; - tsNumOfVnodeQueryThreads = TMIN(tsNumOfVnodeQueryThreads, 1); + tsNumOfVnodeQueryThreads = TMAX(tsNumOfVnodeQueryThreads, 1); if (cfgAddInt32(pCfg, "numOfVnodeQueryThreads", tsNumOfVnodeQueryThreads, 1, 1024, 0) != 0) return -1; tsNumOfVnodeFetchThreads = tsNumOfCores / 2; @@ -402,11 +402,11 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { if (cfgAddInt32(pCfg, "numOfVnodeFetchThreads", tsNumOfVnodeFetchThreads, 1, 1024, 0) != 0) return -1; tsNumOfVnodeWriteThreads = tsNumOfCores; - tsNumOfVnodeWriteThreads = TMIN(tsNumOfVnodeWriteThreads, 1); + tsNumOfVnodeWriteThreads = TMAX(tsNumOfVnodeWriteThreads, 1); if (cfgAddInt32(pCfg, "numOfVnodeWriteThreads", tsNumOfVnodeWriteThreads, 1, 1024, 0) != 0) return -1; tsNumOfVnodeSyncThreads = tsNumOfCores / 2; - tsNumOfVnodeSyncThreads = TMIN(tsNumOfVnodeSyncThreads, 1); + tsNumOfVnodeSyncThreads = TMAX(tsNumOfVnodeSyncThreads, 1); if (cfgAddInt32(pCfg, "numOfVnodeSyncThreads", tsNumOfVnodeSyncThreads, 1, 1024, 0) != 0) return -1; tsNumOfVnodeMergeThreads = tsNumOfCores / 8; @@ -414,7 +414,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { if (cfgAddInt32(pCfg, "numOfVnodeMergeThreads", tsNumOfVnodeMergeThreads, 1, 1024, 0) != 0) return -1; tsNumOfQnodeQueryThreads = tsNumOfCores / 2; - tsNumOfQnodeQueryThreads = TMIN(tsNumOfQnodeQueryThreads, 1); + tsNumOfQnodeQueryThreads = TMAX(tsNumOfQnodeQueryThreads, 1); if (cfgAddInt32(pCfg, "numOfQnodeQueryThreads", tsNumOfQnodeQueryThreads, 1, 1024, 0) != 0) return -1; tsNumOfQnodeFetchThreads = tsNumOfCores / 2; diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index acb77648ae..57a8953253 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -1515,6 +1515,7 @@ int32_t tSerializeSCreateDbReq(void *buf, int32_t bufLen, SCreateDbReq *pReq) { if (tEncodeI32(&encoder, pReq->maxRows) < 0) return -1; if (tEncodeI32(&encoder, pReq->commitTime) < 0) return -1; if (tEncodeI32(&encoder, pReq->fsyncPeriod) < 0) return -1; + if (tEncodeI32(&encoder, pReq->ttl) < 0) return -1; if (tEncodeI8(&encoder, pReq->walLevel) < 0) return -1; if (tEncodeI8(&encoder, pReq->precision) < 0) return -1; if (tEncodeI8(&encoder, pReq->compression) < 0) return -1; @@ -1524,6 +1525,7 @@ int32_t tSerializeSCreateDbReq(void *buf, int32_t bufLen, SCreateDbReq *pReq) { if (tEncodeI8(&encoder, pReq->cacheLastRow) < 0) return -1; if (tEncodeI8(&encoder, pReq->ignoreExist) < 0) return -1; if (tEncodeI8(&encoder, pReq->streamMode) < 0) return -1; + if (tEncodeI8(&encoder, pReq->singleSTable) < 0) return -1; if (tEncodeI32(&encoder, pReq->numOfRetensions) < 0) return -1; for (int32_t i = 0; i < pReq->numOfRetensions; ++i) { SRetention *pRetension = taosArrayGet(pReq->pRetensions, i); @@ -1556,6 +1558,7 @@ int32_t tDeserializeSCreateDbReq(void *buf, int32_t bufLen, SCreateDbReq *pReq) if (tDecodeI32(&decoder, &pReq->maxRows) < 0) return -1; if (tDecodeI32(&decoder, &pReq->commitTime) < 0) return -1; if (tDecodeI32(&decoder, &pReq->fsyncPeriod) < 0) return -1; + if (tDecodeI32(&decoder, &pReq->ttl) < 0) return -1; if (tDecodeI8(&decoder, &pReq->walLevel) < 0) return -1; if (tDecodeI8(&decoder, &pReq->precision) < 0) return -1; if (tDecodeI8(&decoder, &pReq->compression) < 0) return -1; @@ -1565,6 +1568,7 @@ int32_t tDeserializeSCreateDbReq(void *buf, int32_t bufLen, SCreateDbReq *pReq) if (tDecodeI8(&decoder, &pReq->cacheLastRow) < 0) return -1; if (tDecodeI8(&decoder, &pReq->ignoreExist) < 0) return -1; if (tDecodeI8(&decoder, &pReq->streamMode) < 0) return -1; + if (tDecodeI8(&decoder, &pReq->singleSTable) < 0) return -1; if (tDecodeI32(&decoder, &pReq->numOfRetensions) < 0) return -1; pReq->pRetensions = taosArrayInit(pReq->numOfRetensions, sizeof(SRetention)); if (pReq->pRetensions == NULL) { @@ -1942,6 +1946,94 @@ void tFreeSUseDbBatchRsp(SUseDbBatchRsp *pRsp) { taosArrayDestroy(pRsp->pArray); } +int32_t tSerializeSDbCfgReq(void* buf, int32_t bufLen, SDbCfgReq* pReq) { + SCoder encoder = {0}; + tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER); + + if (tStartEncode(&encoder) < 0) return -1; + if (tEncodeCStr(&encoder, pReq->db) < 0) return -1; + tEndEncode(&encoder); + + int32_t tlen = encoder.pos; + tCoderClear(&encoder); + return tlen; +} + +int32_t tDeserializeSDbCfgReq(void* buf, int32_t bufLen, SDbCfgReq* pReq) { + SCoder decoder = {0}; + tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER); + + if (tStartDecode(&decoder) < 0) return -1; + if (tDecodeCStrTo(&decoder, pReq->db) < 0) return -1; + tEndDecode(&decoder); + + tCoderClear(&decoder); + return 0; +} + +int32_t tSerializeSDbCfgRsp(void* buf, int32_t bufLen, const SDbCfgRsp* pRsp) { + SCoder encoder = {0}; + tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER); + + if (tStartEncode(&encoder) < 0) return -1; + if (tEncodeI32(&encoder, pRsp->numOfVgroups) < 0) return -1; + if (tEncodeI32(&encoder, pRsp->cacheBlockSize) < 0) return -1; + if (tEncodeI32(&encoder, pRsp->totalBlocks) < 0) return -1; + if (tEncodeI32(&encoder, pRsp->daysPerFile) < 0) return -1; + if (tEncodeI32(&encoder, pRsp->daysToKeep0) < 0) return -1; + if (tEncodeI32(&encoder, pRsp->daysToKeep1) < 0) return -1; + if (tEncodeI32(&encoder, pRsp->daysToKeep2) < 0) return -1; + if (tEncodeI32(&encoder, pRsp->minRows) < 0) return -1; + if (tEncodeI32(&encoder, pRsp->maxRows) < 0) return -1; + if (tEncodeI32(&encoder, pRsp->commitTime) < 0) return -1; + if (tEncodeI32(&encoder, pRsp->fsyncPeriod) < 0) return -1; + if (tEncodeI8(&encoder, pRsp->walLevel) < 0) return -1; + if (tEncodeI8(&encoder, pRsp->precision) < 0) return -1; + if (tEncodeI8(&encoder, pRsp->compression) < 0) return -1; + if (tEncodeI8(&encoder, pRsp->replications) < 0) return -1; + if (tEncodeI8(&encoder, pRsp->quorum) < 0) return -1; + if (tEncodeI8(&encoder, pRsp->update) < 0) return -1; + if (tEncodeI8(&encoder, pRsp->cacheLastRow) < 0) return -1; + if (tEncodeI8(&encoder, pRsp->streamMode) < 0) return -1; + tEndEncode(&encoder); + + int32_t tlen = encoder.pos; + tCoderClear(&encoder); + return tlen; +} + +int32_t tDeserializeSDbCfgRsp(void* buf, int32_t bufLen, SDbCfgRsp* pRsp) { + SCoder decoder = {0}; + tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER); + + if (tStartDecode(&decoder) < 0) return -1; + if (tDecodeI32(&decoder, &pRsp->numOfVgroups) < 0) return -1; + if (tDecodeI32(&decoder, &pRsp->cacheBlockSize) < 0) return -1; + if (tDecodeI32(&decoder, &pRsp->totalBlocks) < 0) return -1; + if (tDecodeI32(&decoder, &pRsp->daysPerFile) < 0) return -1; + if (tDecodeI32(&decoder, &pRsp->daysToKeep0) < 0) return -1; + if (tDecodeI32(&decoder, &pRsp->daysToKeep1) < 0) return -1; + if (tDecodeI32(&decoder, &pRsp->daysToKeep2) < 0) return -1; + if (tDecodeI32(&decoder, &pRsp->minRows) < 0) return -1; + if (tDecodeI32(&decoder, &pRsp->maxRows) < 0) return -1; + if (tDecodeI32(&decoder, &pRsp->commitTime) < 0) return -1; + if (tDecodeI32(&decoder, &pRsp->fsyncPeriod) < 0) return -1; + if (tDecodeI8(&decoder, &pRsp->walLevel) < 0) return -1; + if (tDecodeI8(&decoder, &pRsp->precision) < 0) return -1; + if (tDecodeI8(&decoder, &pRsp->compression) < 0) return -1; + if (tDecodeI8(&decoder, &pRsp->replications) < 0) return -1; + if (tDecodeI8(&decoder, &pRsp->quorum) < 0) return -1; + if (tDecodeI8(&decoder, &pRsp->update) < 0) return -1; + if (tDecodeI8(&decoder, &pRsp->cacheLastRow) < 0) return -1; + if (tDecodeI8(&decoder, &pRsp->streamMode) < 0) return -1; + + tEndDecode(&decoder); + + tCoderClear(&decoder); + return 0; +} + + int32_t tSerializeSShowReq(void *buf, int32_t bufLen, SShowReq *pReq) { SCoder encoder = {0}; tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER); diff --git a/source/dnode/mgmt/mm/mmHandle.c b/source/dnode/mgmt/mm/mmHandle.c index 3c8dbc5d55..eb72106e4b 100644 --- a/source/dnode/mgmt/mm/mmHandle.c +++ b/source/dnode/mgmt/mm/mmHandle.c @@ -178,6 +178,7 @@ void mmInitMsgHandle(SMgmtWrapper *pWrapper) { dndSetMsgHandle(pWrapper, TDMT_MND_GET_SUB_EP, mmProcessReadMsg, DEFAULT_HANDLE); dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_STREAM, mmProcessWriteMsg, DEFAULT_HANDLE); dndSetMsgHandle(pWrapper, TDMT_VND_TASK_DEPLOY_RSP, mmProcessWriteMsg, DEFAULT_HANDLE); + dndSetMsgHandle(pWrapper, TDMT_MND_GET_DB_CFG, mmProcessReadMsg, DEFAULT_HANDLE); // Requests handled by VNODE dndSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CONN_RSP, mmProcessWriteMsg, DEFAULT_HANDLE); diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 38ef1185e8..38dbdc4799 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -260,6 +260,7 @@ typedef struct { int32_t maxRows; int32_t commitTime; int32_t fsyncPeriod; + int32_t ttl; int8_t walLevel; int8_t precision; int8_t compression; @@ -268,6 +269,7 @@ typedef struct { int8_t update; int8_t cacheLastRow; int8_t streamMode; + int8_t singleSTable; int32_t numOfRetensions; SArray* pRetensions; } SDbCfg; diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index 44c547bec3..ff2ea162fb 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -40,6 +40,7 @@ static int32_t mndProcessCompactDbReq(SNodeMsg *pReq); static int32_t mndGetDbMeta(SNodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta); static int32_t mndRetrieveDbs(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows); static void mndCancelGetNextDb(SMnode *pMnode, void *pIter); +static int32_t mndProcessGetDbCfgReq(SNodeMsg *pReq); int32_t mndInitDb(SMnode *pMnode) { SSdbTable table = {.sdbType = SDB_DB, @@ -56,6 +57,7 @@ int32_t mndInitDb(SMnode *pMnode) { mndSetMsgHandle(pMnode, TDMT_MND_USE_DB, mndProcessUseDbReq); mndSetMsgHandle(pMnode, TDMT_MND_SYNC_DB, mndProcessSyncDbReq); mndSetMsgHandle(pMnode, TDMT_MND_COMPACT_DB, mndProcessCompactDbReq); + mndSetMsgHandle(pMnode, TDMT_MND_GET_DB_CFG, mndProcessGetDbCfgReq); mndAddShowMetaHandle(pMnode, TSDB_MGMT_TABLE_DB, mndGetDbMeta); mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_DB, mndRetrieveDbs); @@ -268,6 +270,7 @@ static int32_t mndCheckDbCfg(SMnode *pMnode, SDbCfg *pCfg) { if (pCfg->minRows > pCfg->maxRows) return -1; if (pCfg->commitTime < TSDB_MIN_COMMIT_TIME || pCfg->commitTime > TSDB_MAX_COMMIT_TIME) return -1; if (pCfg->fsyncPeriod < TSDB_MIN_FSYNC_PERIOD || pCfg->fsyncPeriod > TSDB_MAX_FSYNC_PERIOD) return -1; + if (pCfg->ttl < TSDB_MIN_DB_TTL_OPTION && pCfg->ttl != TSDB_DEFAULT_DB_TTL_OPTION) return -1; if (pCfg->walLevel < TSDB_MIN_WAL_LEVEL || pCfg->walLevel > TSDB_MAX_WAL_LEVEL) return -1; if (pCfg->precision < TSDB_MIN_PRECISION && pCfg->precision > TSDB_MAX_PRECISION) return -1; if (pCfg->compression < TSDB_MIN_COMP_LEVEL || pCfg->compression > TSDB_MAX_COMP_LEVEL) return -1; @@ -278,6 +281,7 @@ static int32_t mndCheckDbCfg(SMnode *pMnode, SDbCfg *pCfg) { if (pCfg->update < TSDB_MIN_DB_UPDATE || pCfg->update > TSDB_MAX_DB_UPDATE) return -1; if (pCfg->cacheLastRow < TSDB_MIN_DB_CACHE_LAST_ROW || pCfg->cacheLastRow > TSDB_MAX_DB_CACHE_LAST_ROW) return -1; if (pCfg->streamMode < TSDB_MIN_DB_STREAM_MODE || pCfg->streamMode > TSDB_MAX_DB_STREAM_MODE) return -1; + if (pCfg->singleSTable < TSDB_MIN_DB_SINGLE_STABLE_OPTION || pCfg->streamMode > TSDB_MAX_DB_SINGLE_STABLE_OPTION) return -1; return TSDB_CODE_SUCCESS; } @@ -293,6 +297,7 @@ static void mndSetDefaultDbCfg(SDbCfg *pCfg) { if (pCfg->maxRows < 0) pCfg->maxRows = TSDB_DEFAULT_MAX_ROW_FBLOCK; if (pCfg->commitTime < 0) pCfg->commitTime = TSDB_DEFAULT_COMMIT_TIME; if (pCfg->fsyncPeriod < 0) pCfg->fsyncPeriod = TSDB_DEFAULT_FSYNC_PERIOD; + if (pCfg->ttl < 0) pCfg->ttl = TSDB_DEFAULT_DB_TTL_OPTION; if (pCfg->walLevel < 0) pCfg->walLevel = TSDB_DEFAULT_WAL_LEVEL; if (pCfg->precision < 0) pCfg->precision = TSDB_DEFAULT_PRECISION; if (pCfg->compression < 0) pCfg->compression = TSDB_DEFAULT_COMP_LEVEL; @@ -301,6 +306,7 @@ static void mndSetDefaultDbCfg(SDbCfg *pCfg) { if (pCfg->update < 0) pCfg->update = TSDB_DEFAULT_DB_UPDATE_OPTION; if (pCfg->cacheLastRow < 0) pCfg->cacheLastRow = TSDB_DEFAULT_CACHE_LAST_ROW; if (pCfg->streamMode < 0) pCfg->streamMode = TSDB_DEFAULT_DB_STREAM_MODE; + if (pCfg->singleSTable < 0) pCfg->singleSTable = TSDB_DEFAULT_DB_SINGLE_STABLE_OPTION; if (pCfg->numOfRetensions < 0) pCfg->numOfRetensions = 0; } @@ -437,6 +443,7 @@ static int32_t mndCreateDb(SMnode *pMnode, SNodeMsg *pReq, SCreateDbReq *pCreate .maxRows = pCreate->maxRows, .commitTime = pCreate->commitTime, .fsyncPeriod = pCreate->fsyncPeriod, + .ttl = pCreate->ttl, .walLevel = pCreate->walLevel, .precision = pCreate->precision, .compression = pCreate->compression, @@ -445,6 +452,7 @@ static int32_t mndCreateDb(SMnode *pMnode, SNodeMsg *pReq, SCreateDbReq *pCreate .update = pCreate->update, .cacheLastRow = pCreate->cacheLastRow, .streamMode = pCreate->streamMode, + .singleSTable = pCreate->singleSTable, }; dbObj.cfg.numOfRetensions = pCreate->numOfRetensions; @@ -730,6 +738,71 @@ ALTER_DB_OVER: return code; } +static int32_t mndProcessGetDbCfgReq(SNodeMsg *pReq) { + SMnode *pMnode = pReq->pNode; + int32_t code = -1; + SDbObj *pDb = NULL; + SDbCfgReq cfgReq = {0}; + SDbCfgRsp cfgRsp = {0}; + + if (tDeserializeSDbCfgReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &cfgReq) != 0) { + terrno = TSDB_CODE_INVALID_MSG; + goto GET_DB_CFG_OVER; + } + + pDb = mndAcquireDb(pMnode, cfgReq.db); + if (pDb == NULL) { + terrno = TSDB_CODE_MND_DB_NOT_EXIST; + goto GET_DB_CFG_OVER; + } + + cfgRsp.numOfVgroups = pDb->cfg.numOfVgroups; + cfgRsp.cacheBlockSize = pDb->cfg.cacheBlockSize; + cfgRsp.totalBlocks = pDb->cfg.totalBlocks; + cfgRsp.daysPerFile = pDb->cfg.daysPerFile; + cfgRsp.daysToKeep0 = pDb->cfg.daysToKeep0; + cfgRsp.daysToKeep1 = pDb->cfg.daysToKeep1; + cfgRsp.daysToKeep2 = pDb->cfg.daysToKeep2; + cfgRsp.minRows = pDb->cfg.minRows; + cfgRsp.maxRows = pDb->cfg.maxRows; + cfgRsp.commitTime = pDb->cfg.commitTime; + cfgRsp.fsyncPeriod = pDb->cfg.fsyncPeriod; + cfgRsp.ttl = pDb->cfg.ttl; + cfgRsp.walLevel = pDb->cfg.walLevel; + cfgRsp.precision = pDb->cfg.precision; + cfgRsp.compression = pDb->cfg.compression; + cfgRsp.replications = pDb->cfg.replications; + cfgRsp.quorum = pDb->cfg.quorum; + cfgRsp.update = pDb->cfg.update; + cfgRsp.cacheLastRow = pDb->cfg.cacheLastRow; + cfgRsp.streamMode = pDb->cfg.streamMode; + cfgRsp.singleSTable = pDb->cfg.singleSTable; + + int32_t contLen = tSerializeSDbCfgRsp(NULL, 0, &cfgRsp); + void *pRsp = rpcMallocCont(contLen); + if (pRsp == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + code = -1; + goto GET_DB_CFG_OVER; + } + + tSerializeSDbCfgRsp(pRsp, contLen, &cfgRsp); + + pReq->pRsp = pRsp; + pReq->rspLen = contLen; + +GET_DB_CFG_OVER: + + if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { + mError("db:%s, failed to get cfg since %s", cfgReq.db, terrstr()); + } + + mndReleaseDb(pMnode, pDb); + + return code; +} + + static int32_t mndSetDropDbRedoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) { SSdbRaw *pRedoRaw = mndDbActionEncode(pDb); if (pRedoRaw == NULL) return -1; @@ -1509,6 +1582,23 @@ static void dumpDbInfoToPayload(char *data, SDbObj *pDb, SShowObj *pShow, int32_ STR_WITH_SIZE_TO_VARSTR(pWrite, prec, 2); cols++; + pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity); + *(int32_t *)pWrite = pDb->cfg.ttl; + cols++; + + pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity); + *(int8_t *)pWrite = pDb->cfg.singleSTable; + cols++; + + pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity); + *(int8_t *)pWrite = pDb->cfg.streamMode; + cols++; + + pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity); + char *status = "ready"; + STR_WITH_SIZE_TO_VARSTR(pWrite, status, strlen(status)); + cols++; + // pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity); // *(int8_t *)pWrite = pDb->cfg.update; } diff --git a/source/dnode/mnode/impl/src/mndInfoSchema.c b/source/dnode/mnode/impl/src/mndInfoSchema.c index 92b854157b..aadd914439 100644 --- a/source/dnode/mnode/impl/src/mndInfoSchema.c +++ b/source/dnode/mnode/impl/src/mndInfoSchema.c @@ -63,7 +63,11 @@ static const SInfosTableSchema userDBSchema[] = { {.name = "fsync", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, {.name = "comp", .bytes = 1, .type = TSDB_DATA_TYPE_TINYINT}, {.name = "cachelast", .bytes = 1, .type = TSDB_DATA_TYPE_TINYINT}, - {.name = "precision", .bytes = 3 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY}, + {.name = "precision", .bytes = 2 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY}, + {.name = "ttl", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, + {.name = "single_stable", .bytes = 1, .type = TSDB_DATA_TYPE_TINYINT}, + {.name = "stream_mode", .bytes = 1, .type = TSDB_DATA_TYPE_TINYINT}, + {.name = "status", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY}, // {.name = "update", .bytes = 1, .type = // TSDB_DATA_TYPE_TINYINT}, // disable update }; diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 8078338238..dbd5e43b6d 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -60,7 +60,7 @@ static int32_t mndProcessResetOffsetReq(SNodeMsg *pMsg); static int32_t mndPersistMqSetConnReq(SMnode *pMnode, STrans *pTrans, const SMqTopicObj *pTopic, const char *cgroup, const SMqConsumerEp *pConsumerEp); -static int32_t mndPersistRebalanceMsg(SMnode *pMnode, STrans *pTrans, const SMqConsumerEp *pConsumerEp); +static int32_t mndPersistRebalanceMsg(SMnode *pMnode, STrans *pTrans, const SMqConsumerEp *pConsumerEp, const char* topicName); static int32_t mndPersistCancelConnReq(SMnode *pMnode, STrans *pTrans, const SMqConsumerEp *pConsumerEp, const char* oldTopicName); int32_t mndInitSubscribe(SMnode *pMnode) { @@ -102,12 +102,13 @@ static SMqSubscribeObj *mndCreateSubscription(SMnode *pMnode, const SMqTopicObj return pSub; } -static int32_t mndBuildRebalanceMsg(void **pBuf, int32_t *pLen, const SMqConsumerEp *pConsumerEp) { +static int32_t mndBuildRebalanceMsg(void **pBuf, int32_t *pLen, const SMqConsumerEp *pConsumerEp, const char* topicName) { SMqMVRebReq req = { .vgId = pConsumerEp->vgId, .oldConsumerId = pConsumerEp->oldConsumerId, .newConsumerId = pConsumerEp->consumerId, }; + req.topic = strdup(topicName); int32_t tlen = tEncodeSMqMVRebReq(NULL, &req); void *buf = taosMemoryMalloc(sizeof(SMsgHead) + tlen); @@ -122,6 +123,7 @@ static int32_t mndBuildRebalanceMsg(void **pBuf, int32_t *pLen, const SMqConsume void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); tEncodeSMqMVRebReq(&abuf, &req); + taosMemoryFree(req.topic); *pBuf = buf; *pLen = tlen; @@ -129,12 +131,12 @@ static int32_t mndBuildRebalanceMsg(void **pBuf, int32_t *pLen, const SMqConsume return 0; } -static int32_t mndPersistRebalanceMsg(SMnode *pMnode, STrans *pTrans, const SMqConsumerEp *pConsumerEp) { +static int32_t mndPersistRebalanceMsg(SMnode *pMnode, STrans *pTrans, const SMqConsumerEp *pConsumerEp, const char* topicName) { ASSERT(pConsumerEp->oldConsumerId != -1); void *buf; int32_t tlen; - if (mndBuildRebalanceMsg(&buf, &tlen, pConsumerEp) < 0) { + if (mndBuildRebalanceMsg(&buf, &tlen, pConsumerEp, topicName) < 0) { return -1; } @@ -502,10 +504,10 @@ static int32_t mndProcessDoRebalanceMsg(SNodeMsg *pMsg) { pConsumerEp->epoch = 0; taosArrayPush(pSubConsumer->vgInfo, pConsumerEp); + char topic[TSDB_TOPIC_FNAME_LEN]; + char cgroup[TSDB_CGROUP_LEN]; + mndSplitSubscribeKey(pSub->key, topic, cgroup); if (pConsumerEp->oldConsumerId == -1) { - char topic[TSDB_TOPIC_FNAME_LEN]; - char cgroup[TSDB_CGROUP_LEN]; - mndSplitSubscribeKey(pSub->key, topic, cgroup); SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic); mInfo("mq set conn: assign vgroup %d of topic %s to consumer %" PRId64 " cgroup: %s", pConsumerEp->vgId, @@ -517,7 +519,7 @@ static int32_t mndProcessDoRebalanceMsg(SNodeMsg *pMsg) { mInfo("mq rebalance: assign vgroup %d, from consumer %" PRId64 " to consumer %" PRId64 "", pConsumerEp->vgId, pConsumerEp->oldConsumerId, pConsumerEp->consumerId); - mndPersistRebalanceMsg(pMnode, pTrans, pConsumerEp); + mndPersistRebalanceMsg(pMnode, pTrans, pConsumerEp, topic); } } } @@ -849,7 +851,7 @@ static int32_t mndProcessSubscribeReq(SNodeMsg *pMsg) { pConsumerEp->consumerId); mndPersistMqSetConnReq(pMnode, pTrans, pTopic, cgroup, pConsumerEp); } else { - mndPersistRebalanceMsg(pMnode, pTrans, pConsumerEp); + mndPersistRebalanceMsg(pMnode, pTrans, pConsumerEp, newTopicName); } // to trigger rebalance at once, do not set status active /*atomic_store_32(&pConsumer->status, MQ_CONSUMER_STATUS__ACTIVE);*/ diff --git a/source/dnode/vnode/CMakeLists.txt b/source/dnode/vnode/CMakeLists.txt index 28e19dc017..b5fe7d460f 100644 --- a/source/dnode/vnode/CMakeLists.txt +++ b/source/dnode/vnode/CMakeLists.txt @@ -68,7 +68,7 @@ target_link_libraries( PUBLIC executor PUBLIC scheduler PUBLIC tdb - PUBLIC bdb + #PUBLIC bdb PUBLIC transport PUBLIC stream ) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 28d71fb842..f168695383 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -316,7 +316,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { return 0; } - vDebug("poll topic %s from consumer %ld (epoch %d)", pTopic->topicName, consumerId, pReq->epoch); + vDebug("poll topic %s from consumer %ld (epoch %d) %s", pTopic->topicName, consumerId, pReq->epoch, pTopic->topicName); rsp.reqOffset = pReq->currentOffset; rsp.skipLogNum = 0; @@ -334,12 +334,10 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { // TODO: no more log, set timer to wait blocking time // if data inserted during waiting, launch query and // response to user - vDebug("tmq poll: consumer %ld (epoch %d) vg %d offset %ld, no more log to return", consumerId, pReq->epoch, - pTq->pVnode->vgId, fetchOffset); + vDebug("tmq poll: consumer %ld (epoch %d) vg %d offset %ld, no more log to return", consumerId, pReq->epoch, pTq->pVnode->vgId, fetchOffset); break; } - vDebug("tmq poll: consumer %ld (epoch %d) iter log, vg %d offset %ld msgType %d", consumerId, pReq->epoch, - pTq->pVnode->vgId, fetchOffset, pHead->msgType); + vDebug("tmq poll: consumer %ld (epoch %d) iter log, vg %d offset %ld msgType %d", consumerId, pReq->epoch, pTq->pVnode->vgId, fetchOffset, pHead->msgType); /*int8_t pos = fetchOffset % TQ_BUFFER_SIZE;*/ /*pHead = pTopic->pReadhandle->pHead;*/ if (pHead->msgType == TDMT_VND_SUBMIT) { @@ -363,8 +361,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { } if (taosArrayGetSize(pRes) == 0) { - vDebug("tmq poll: consumer %ld (epoch %d) iter log, vg %d skip log %ld since not wanted", consumerId, - pReq->epoch, pTq->pVnode->vgId, fetchOffset); + vDebug("tmq poll: consumer %ld (epoch %d) iter log, vg %d skip log %ld since not wanted", consumerId, pReq->epoch, pTq->pVnode->vgId, fetchOffset); fetchOffset++; rsp.skipLogNum++; taosArrayDestroy(pRes); @@ -393,8 +390,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { pMsg->pCont = buf; pMsg->contLen = tlen; pMsg->code = 0; - vDebug("vg %d offset %ld msgType %d from consumer %ld (epoch %d) actual rsp", pTq->pVnode->vgId, fetchOffset, - pHead->msgType, consumerId, pReq->epoch); + vDebug("vg %d offset %ld msgType %d from consumer %ld (epoch %d) actual rsp", pTq->pVnode->vgId, fetchOffset, pHead->msgType, consumerId, pReq->epoch); tmsgSendRsp(pMsg); taosMemoryFree(pHead); return 0; @@ -425,8 +421,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { pMsg->contLen = tlen; pMsg->code = 0; tmsgSendRsp(pMsg); - vDebug("vg %d offset %ld from consumer %ld (epoch %d) not rsp", pTq->pVnode->vgId, fetchOffset, consumerId, - pReq->epoch); + vDebug("vg %d offset %ld from consumer %ld (epoch %d) not rsp", pTq->pVnode->vgId, fetchOffset, consumerId, pReq->epoch); /*}*/ return 0; @@ -437,7 +432,7 @@ int32_t tqProcessRebReq(STQ* pTq, char* msg) { terrno = TSDB_CODE_SUCCESS; tDecodeSMqMVRebReq(msg, &req); - vDebug("vg %d set from consumer %ld to consumer %ld", req.vgId, req.oldConsumerId, req.newConsumerId); + vDebug("vg %d set from consumer %ld to consumer %ld", req.vgId, req.oldConsumerId ,req.newConsumerId); STqConsumer* pConsumer = tqHandleGet(pTq->tqMeta, req.oldConsumerId); ASSERT(pConsumer); ASSERT(pConsumer->consumerId == req.oldConsumerId); diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index a18374015b..fee8bdf27e 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -167,8 +167,8 @@ SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) { if (!tdSTSRowIterNext(&iter, pColData->info.colId, pColData->info.type, &sVal)) { break; } - if (colDataAppend(pColData, curRow, sVal.val, false) < 0) { - /*if (colDataAppend(pColData, curRow, sVal.val, sVal.valType == TD_VTYPE_NULL) < 0) {*/ + /*if (colDataAppend(pColData, curRow, sVal.val, false) < 0) {*/ + if (colDataAppend(pColData, curRow, sVal.val, sVal.valType == TD_VTYPE_NULL) < 0) { taosArrayDestroyEx(pArray, (void (*)(void*))tDeleteSSDataBlock); return NULL; } diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index e1f5332899..6ed09ce7eb 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -569,6 +569,44 @@ int32_t ctgGetDBVgInfoFromMnode(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtE return TSDB_CODE_SUCCESS; } +int32_t ctgGetDBCfgFromMnode(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char *dbFName, SDbCfgInfo *out) { + char *msg = NULL; + int32_t msgLen = 0; + + ctgDebug("try to get db cfg from mnode, dbFName:%s", dbFName); + + int32_t code = queryBuildMsg[TMSG_INDEX(TDMT_MND_GET_DB_CFG)]((void *)dbFName, &msg, 0, &msgLen); + if (code) { + ctgError("Build get db cfg msg failed, code:%x, db:%s", code, dbFName); + CTG_ERR_RET(code); + } + + SRpcMsg rpcMsg = { + .msgType = TDMT_MND_GET_DB_CFG, + .pCont = msg, + .contLen = msgLen, + }; + + SRpcMsg rpcRsp = {0}; + + rpcSendRecv(pRpc, (SEpSet*)pMgmtEps, &rpcMsg, &rpcRsp); + if (TSDB_CODE_SUCCESS != rpcRsp.code) { + ctgError("error rsp for get db cfg, error:%s, db:%s", tstrerror(rpcRsp.code), dbFName); + CTG_ERR_RET(rpcRsp.code); + } + + code = queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_GET_DB_CFG)](out, rpcRsp.pCont, rpcRsp.contLen); + if (code) { + ctgError("Process get db cfg rsp failed, code:%x, db:%s", code, dbFName); + CTG_ERR_RET(code); + } + + ctgDebug("Got db cfg from mnode, dbFName:%s", dbFName); + + return TSDB_CODE_SUCCESS; +} + + int32_t ctgIsTableMetaExistInCache(SCatalog* pCtg, char *dbFName, char* tbName, int32_t *exist) { if (NULL == pCtg->dbCache) { *exist = 0; @@ -2137,7 +2175,6 @@ _return: CTG_RET(code); } - int32_t catalogInit(SCatalogCfg *cfg) { if (gCtgMgmt.pCluster) { qError("catalog already initialized"); @@ -2717,6 +2754,15 @@ int32_t catalogGetExpiredDBs(SCatalog* pCtg, SDbVgVersion **dbs, uint32_t *num) CTG_API_LEAVE(ctgMetaRentGet(&pCtg->dbRent, (void **)dbs, num, sizeof(SDbVgVersion))); } +int32_t catalogGetDBCfg(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* dbFName, SDbCfgInfo* pDbCfg) { + CTG_API_ENTER(); + + if (NULL == pCtg || NULL == pRpc || NULL == pMgmtEps || NULL == dbFName || NULL == pDbCfg) { + CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT); + } + + CTG_API_LEAVE(ctgGetDBCfgFromMnode(pCtg, pRpc, pMgmtEps, dbFName, pDbCfg)); +} void catalogDestroy(void) { qInfo("start to destroy catalog"); diff --git a/source/libs/command/src/explain.c b/source/libs/command/src/explain.c index 2a36113749..605d8f41da 100644 --- a/source/libs/command/src/explain.c +++ b/source/libs/command/src/explain.c @@ -618,7 +618,6 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i EXPLAIN_ROW_APPEND(EXPLAIN_COLUMNS_FORMAT, pSortNode->pSortKeys->length); EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pSortNode->node.pOutputDataBlockDesc->totalRowSize); - EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); EXPLAIN_ROW_APPEND(EXPLAIN_RIGHT_PARENTHESIS_FORMAT); EXPLAIN_ROW_END(); QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level)); diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 57edc40007..35a4b93a88 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -511,6 +511,12 @@ typedef struct SProjectOperatorInfo { SSDataBlock *existDataBlock; SArray *pPseudoColInfo; SLimit limit; + SLimit slimit; + + uint64_t groupId; + int64_t curSOffset; + int64_t curGroupOutput; + int64_t curOffset; int64_t curOutput; } SProjectOperatorInfo; @@ -563,6 +569,29 @@ typedef struct SGroupbyOperatorInfo { SAggSupporter aggSup; } SGroupbyOperatorInfo; +typedef struct SDataGroupInfo { + uint64_t groupId; + int64_t numOfRows; + SArray *pPageList; +} SDataGroupInfo; + +// The sort in partition may be needed later. +typedef struct SPartitionOperatorInfo { + SOptrBasicInfo binfo; + SArray* pGroupCols; + SArray* pGroupColVals; // current group column values, SArray + char* keyBuf; // group by keys for hash + int32_t groupKeyLen; // total group by column width + SHashObj* pGroupSet; // quick locate the window object for each result + + SDiskbasedBuf* pBuf; // query result buffer based on blocked-wised disk file + int32_t rowCapacity; // maximum number of rows for each buffer page + int32_t* columnOffset; // start position for each column data + + void* pGroupIter; // group iterator + int32_t pageIndex; // page index of current group +} SPartitionOperatorInfo; + typedef struct SSessionAggOperatorInfo { SOptrBasicInfo binfo; SAggSupporter aggSup; @@ -650,7 +679,7 @@ SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo); SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo); -SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t num, SSDataBlock* pResBlock, SLimit* pLimit, SExecTaskInfo* pTaskInfo); +SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t num, SSDataBlock* pResBlock, SLimit* pLimit, SLimit* pSlimit, SExecTaskInfo* pTaskInfo); SOperatorInfo *createSortOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pResBlock, SArray* pSortInfo, SExecTaskInfo* pTaskInfo); SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t numOfDownstream, SExprInfo* pExprInfo, int32_t num, SArray* pSortInfo, SArray* pGroupInfo, SExecTaskInfo* pTaskInfo); SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, SSDataBlock* pResBlock, const SName* pName, @@ -667,8 +696,8 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExp int32_t fillType, char* fillVal, bool multigroupResult, SExecTaskInfo* pTaskInfo); SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pResultBlock, SArray* pSortInfo, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo); - +SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SArray* pGroupColList, + SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo); #if 0 SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv); SOperatorInfo* createAllTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, diff --git a/source/libs/executor/src/dataDispatcher.c b/source/libs/executor/src/dataDispatcher.c index 34894c235b..74ec4e55a6 100644 --- a/source/libs/executor/src/dataDispatcher.c +++ b/source/libs/executor/src/dataDispatcher.c @@ -209,6 +209,8 @@ static int32_t getDataBlock(SDataSinkHandle* pHandle, SOutputData* pOutput) { assert(pDispatcher->queryEnd); pOutput->useconds = pDispatcher->useconds; pOutput->precision = pDispatcher->pSchema->precision; + pOutput->bufStatus = DS_BUF_EMPTY; + pOutput->queryEnd = pDispatcher->queryEnd; return TSDB_CODE_SUCCESS; } SDataCacheEntry* pEntry = (SDataCacheEntry*)(pDispatcher->nextOutput.pData); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 54e8e6ae8b..e85cfb76fd 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -302,6 +302,10 @@ SSDataBlock* createOutputBuf_rv1(SDataBlockDescNode* pNode) { } taosArrayPush(pBlock->pDataBlock, &idata); + + if (IS_VAR_DATA_TYPE(idata.info.type)) { + pBlock->info.hasVarCol = true; + } } return pBlock; @@ -1259,6 +1263,7 @@ static void setPseudoOutputColInfo(SSDataBlock* pResult, SqlFunctionCtx* pCtx, S static void projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pSrcBlock, SqlFunctionCtx* pCtx, int32_t numOfOutput, SArray* pPseudoList) { setPseudoOutputColInfo(pResult, pCtx, pPseudoList); + pResult->info.groupId = pSrcBlock->info.groupId; for (int32_t k = 0; k < numOfOutput; ++k) { if (pExpr[k].pExpr->nodeType == QUERY_NODE_COLUMN) { // it is a project query @@ -5422,7 +5427,6 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator, bool* newgroup) publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC); if (pBlock == NULL) { - assert(*newgroup == false); *newgroup = prevVal; setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED); break; @@ -5450,6 +5454,38 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator, bool* newgroup) projectApplyFunctions(pOperator->pExpr, pInfo->pRes, pBlock, pInfo->pCtx, pOperator->numOfOutput, pProjectInfo->pPseudoColInfo); + if (pProjectInfo->curSOffset > 0) { + if (pProjectInfo->groupId == 0) { // it is the first group + pProjectInfo->groupId = pBlock->info.groupId; + blockDataCleanup(pInfo->pRes); + continue; + } else if (pProjectInfo->groupId != pBlock->info.groupId) { + pProjectInfo->curSOffset -= 1; + + // ignore data block in current group + if (pProjectInfo->curSOffset > 0) { + blockDataCleanup(pInfo->pRes); + continue; + } + } + + pProjectInfo->groupId = pBlock->info.groupId; + } + + if (pProjectInfo->groupId != 0 && pProjectInfo->groupId != pBlock->info.groupId) { + pProjectInfo->curGroupOutput += 1; + if ((pProjectInfo->slimit.limit > 0) && (pProjectInfo->slimit.limit <= pProjectInfo->curGroupOutput)) { + pOperator->status = OP_EXEC_DONE; + return NULL; + } + + // reset the value for a new group data + pProjectInfo->curOffset = 0; + pProjectInfo->curOutput = 0; + } + + pProjectInfo->groupId = pBlock->info.groupId; + // todo extract method if (pProjectInfo->curOffset < pInfo->pRes->info.rows && pProjectInfo->curOffset > 0) { blockDataTrimFirstNRows(pInfo->pRes, pProjectInfo->curOffset); @@ -6317,7 +6353,7 @@ static SArray* setRowTsColumnOutputInfo(SqlFunctionCtx* pCtx, int32_t numOfCols) } SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t num, - SSDataBlock* pResBlock, SLimit* pLimit, SExecTaskInfo* pTaskInfo) { + SSDataBlock* pResBlock, SLimit* pLimit, SLimit* pSlimit, SExecTaskInfo* pTaskInfo) { SProjectOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SProjectOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { @@ -6325,7 +6361,10 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SExprInfo* p } pInfo->limit = *pLimit; + pInfo->slimit = *pSlimit; pInfo->curOffset = pLimit->offset; + pInfo->curSOffset = pSlimit->offset; + pInfo->binfo.pRes = pResBlock; int32_t numOfCols = num; @@ -7052,11 +7091,13 @@ static SArray* extractScanColumnId(SNodeList* pNodeList); static SArray* extractColumnInfo(SNodeList* pNodeList); static SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNodeList, int32_t* numOfOutputCols); static SArray* createSortInfo(SNodeList* pNodeList); +static SArray* extractPartitionColInfo(SNodeList* pNodeList); SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, uint64_t queryId, uint64_t taskId, STableGroupInfo* pTableGroupInfo) { if (pPhyNode->pChildren == NULL || LIST_LENGTH(pPhyNode->pChildren) == 0) { - if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == nodeType(pPhyNode)) { + int32_t type = nodeType(pPhyNode); + if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == type) { SScanPhysiNode* pScanPhyNode = (SScanPhysiNode*)pPhyNode; int32_t numOfCols = 0; @@ -7064,11 +7105,11 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo SArray* pColList = extractColMatchInfo(pScanPhyNode->pScanCols, pScanPhyNode->node.pOutputDataBlockDesc, &numOfCols); return createTableScanOperatorInfo(pDataReader, pScanPhyNode->order, numOfCols, pScanPhyNode->count, pScanPhyNode->reverse, pColList, pScanPhyNode->node.pConditions, pTaskInfo); - } else if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == nodeType(pPhyNode)) { + } else if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == type) { SExchangePhysiNode* pExchange = (SExchangePhysiNode*)pPhyNode; SSDataBlock* pResBlock = createOutputBuf_rv1(pExchange->node.pOutputDataBlockDesc); return createExchangeOperatorInfo(pExchange->pSrcEndPoints, pResBlock, pTaskInfo); - } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN == nodeType(pPhyNode)) { + } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN == type) { SScanPhysiNode* pScanPhyNode = (SScanPhysiNode*)pPhyNode; // simple child table. int32_t code = doCreateTableGroup(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, pTableGroupInfo, queryId, taskId); @@ -7081,7 +7122,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo SOperatorInfo* pOperator = createStreamScanOperatorInfo(pHandle->reader, pResBlock, pColList, tableIdList, pTaskInfo); taosArrayDestroy(tableIdList); return pOperator; - } else if (QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN == nodeType(pPhyNode)) { + } else if (QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN == type) { SSystemTableScanPhysiNode* pSysScanPhyNode = (SSystemTableScanPhysiNode*)pPhyNode; SSDataBlock* pResBlock = createOutputBuf_rv1(pSysScanPhyNode->scan.node.pOutputDataBlockDesc); @@ -7097,93 +7138,76 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo } } - if (QUERY_NODE_PHYSICAL_PLAN_PROJECT == nodeType(pPhyNode)) { - size_t size = LIST_LENGTH(pPhyNode->pChildren); - assert(size == 1); + int32_t type = nodeType(pPhyNode); + size_t size = LIST_LENGTH(pPhyNode->pChildren); + ASSERT(size == 1); - SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, 0); - SOperatorInfo* op = createOperatorTree(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo); + SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, 0); + SOperatorInfo* op = createOperatorTree(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo); + if (QUERY_NODE_PHYSICAL_PLAN_PROJECT == type) { int32_t num = 0; SProjectPhysiNode* pProjPhyNode = (SProjectPhysiNode*) pPhyNode; SExprInfo* pExprInfo = createExprInfo(pProjPhyNode->pProjections, NULL, &num); SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc); SLimit limit = {.limit = pProjPhyNode->limit, .offset = pProjPhyNode->offset}; + SLimit slimit = {.limit = pProjPhyNode->slimit, .offset = pProjPhyNode->soffset}; + return createProjectOperatorInfo(op, pExprInfo, num, pResBlock, &limit, &slimit, pTaskInfo); + } else if (QUERY_NODE_PHYSICAL_PLAN_AGG == type) { + int32_t num = 0; - return createProjectOperatorInfo(op, pExprInfo, num, pResBlock, &limit, pTaskInfo); - } else if (QUERY_NODE_PHYSICAL_PLAN_AGG == nodeType(pPhyNode)) { - size_t size = LIST_LENGTH(pPhyNode->pChildren); - assert(size == 1); + SAggPhysiNode* pAggNode = (SAggPhysiNode*)pPhyNode; + SExprInfo* pExprInfo = createExprInfo(pAggNode->pAggFuncs, pAggNode->pGroupKeys, &num); + SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc); - for (int32_t i = 0; i < size; ++i) { - SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, i); - SOperatorInfo* op = createOperatorTree(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo); - - int32_t num = 0; - - SAggPhysiNode* pAggNode = (SAggPhysiNode*)pPhyNode; - SExprInfo* pExprInfo = createExprInfo(pAggNode->pAggFuncs, pAggNode->pGroupKeys, &num); - SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc); - - if (pAggNode->pGroupKeys != NULL) { - SArray* pColList = extractColumnInfo(pAggNode->pGroupKeys); - return createGroupOperatorInfo(op, pExprInfo, num, pResBlock, pColList, pAggNode->node.pConditions, pTaskInfo, NULL); - } else { - return createAggregateOperatorInfo(op, pExprInfo, num, pResBlock, pTaskInfo, pTableGroupInfo); - } + if (pAggNode->pGroupKeys != NULL) { + SArray* pColList = extractColumnInfo(pAggNode->pGroupKeys); + return createGroupOperatorInfo(op, pExprInfo, num, pResBlock, pColList, pAggNode->node.pConditions, pTaskInfo, NULL); + } else { + return createAggregateOperatorInfo(op, pExprInfo, num, pResBlock, pTaskInfo, pTableGroupInfo); } - } else if (QUERY_NODE_PHYSICAL_PLAN_INTERVAL == nodeType(pPhyNode)) { - size_t size = LIST_LENGTH(pPhyNode->pChildren); - assert(size == 1); + } else if (QUERY_NODE_PHYSICAL_PLAN_INTERVAL == type) { + SIntervalPhysiNode* pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode; - for (int32_t i = 0; i < size; ++i) { - SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, i); - SOperatorInfo* op = createOperatorTree(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo); + int32_t num = 0; + SExprInfo* pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &num); + SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc); - SIntervalPhysiNode* pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode; - - int32_t num = 0; - SExprInfo* pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &num); - SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc); - - SInterval interval = { - .interval = pIntervalPhyNode->interval, - .sliding = pIntervalPhyNode->sliding, - .intervalUnit = pIntervalPhyNode->intervalUnit, - .slidingUnit = pIntervalPhyNode->slidingUnit, - .offset = pIntervalPhyNode->offset, - .precision = pIntervalPhyNode->precision - }; - - int32_t primaryTsSlotId = ((SColumnNode*) pIntervalPhyNode->pTspk)->slotId; - return createIntervalOperatorInfo(op, pExprInfo, num, pResBlock, &interval, primaryTsSlotId, pTableGroupInfo, pTaskInfo); - } - } else if (QUERY_NODE_PHYSICAL_PLAN_SORT == nodeType(pPhyNode)) { - size_t size = LIST_LENGTH(pPhyNode->pChildren); - assert(size == 1); - - SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, 0); - SOperatorInfo* op = createOperatorTree(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo); + SInterval interval = { + .interval = pIntervalPhyNode->interval, + .sliding = pIntervalPhyNode->sliding, + .intervalUnit = pIntervalPhyNode->intervalUnit, + .slidingUnit = pIntervalPhyNode->slidingUnit, + .offset = pIntervalPhyNode->offset, + .precision = pIntervalPhyNode->precision + }; + int32_t primaryTsSlotId = ((SColumnNode*) pIntervalPhyNode->pTspk)->slotId; + return createIntervalOperatorInfo(op, pExprInfo, num, pResBlock, &interval, primaryTsSlotId, pTableGroupInfo, pTaskInfo); + } else if (QUERY_NODE_PHYSICAL_PLAN_SORT == type) { SSortPhysiNode* pSortPhyNode = (SSortPhysiNode*)pPhyNode; SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc); - SArray* info = createSortInfo(pSortPhyNode->pSortKeys); + + SArray* info = createSortInfo(pSortPhyNode->pSortKeys); return createSortOperatorInfo(op, pResBlock, info, pTaskInfo); - } else if (QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW == nodeType(pPhyNode)) { - size_t size = LIST_LENGTH(pPhyNode->pChildren); - assert(size == 1); - - SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, 0); - SOperatorInfo* op = createOperatorTree(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo); - + } else if (QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW == type) { SSessionWinodwPhysiNode* pSessionNode = (SSessionWinodwPhysiNode*)pPhyNode; int32_t num = 0; SExprInfo* pExprInfo = createExprInfo(pSessionNode->window.pFuncs, NULL, &num); SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc); return createSessionAggOperatorInfo(op, pExprInfo, num, pResBlock, pSessionNode->gap, pTaskInfo); + } else if (QUERY_NODE_PHYSICAL_PLAN_PARTITION == type) { + SPartitionPhysiNode* pPartNode = (SPartitionPhysiNode*) pPhyNode; + SArray* pColList = extractPartitionColInfo(pPartNode->pPartitionKeys); + SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc); + + int32_t num = 0; + SExprInfo* pExprInfo = createExprInfo(pPartNode->pTargets, NULL, &num); + + return createPartitionOperatorInfo(op, pExprInfo, num, pResBlock, pColList, pTaskInfo, NULL); } else { ASSERT(0); } /*else if (pPhyNode->info.type == OP_MultiTableAggregate) { @@ -7266,11 +7290,38 @@ SArray* extractColumnInfo(SNodeList* pNodeList) { STargetNode* pNode = (STargetNode*)nodesListGetNode(pNodeList, i); SColumnNode* pColNode = (SColumnNode*)pNode->pExpr; + // todo extract method SColumn c = {0}; c.slotId = pColNode->slotId; - c.colId = pColNode->colId; - c.type = pColNode->node.resType.type; - c.bytes = pColNode->node.resType.bytes; + c.colId = pColNode->colId; + c.type = pColNode->node.resType.type; + c.bytes = pColNode->node.resType.bytes; + c.precision = pColNode->node.resType.precision; + c.scale = pColNode->node.resType.scale; + + taosArrayPush(pList, &c); + } + + return pList; +} + +SArray* extractPartitionColInfo(SNodeList* pNodeList) { + size_t numOfCols = LIST_LENGTH(pNodeList); + SArray* pList = taosArrayInit(numOfCols, sizeof(SColumn)); + if (pList == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + + for (int32_t i = 0; i < numOfCols; ++i) { + SColumnNode* pColNode = (SColumnNode*)nodesListGetNode(pNodeList, i); + + // todo extract method + SColumn c = {0}; + c.slotId = pColNode->slotId; + c.colId = pColNode->colId; + c.type = pColNode->node.resType.type; + c.bytes = pColNode->node.resType.bytes; c.precision = pColNode->node.resType.precision; c.scale = pColNode->node.resType.scale; @@ -7289,23 +7340,13 @@ SArray* createSortInfo(SNodeList* pNodeList) { } for (int32_t i = 0; i < numOfCols; ++i) { - STargetNode* pNode = (STargetNode*)nodesListGetNode(pNodeList, i); - SOrderByExprNode* pSortKey = (SOrderByExprNode*)pNode->pExpr; + SOrderByExprNode* pSortKey = (SOrderByExprNode*)nodesListGetNode(pNodeList, i); SBlockOrderInfo bi = {0}; bi.order = (pSortKey->order == ORDER_ASC) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC; bi.nullFirst = (pSortKey->nullOrder == NULL_ORDER_FIRST); SColumnNode* pColNode = (SColumnNode*)pSortKey->pExpr; bi.slotId = pColNode->slotId; - // pColNode->order; - // SColumn c = {0}; - // c.slotId = pColNode->slotId; - // c.colId = pColNode->colId; - // c.type = pColNode->node.resType.type; - // c.bytes = pColNode->node.resType.bytes; - // c.precision = pColNode->node.resType.precision; - // c.scale = pColNode->node.resType.scale; - taosArrayPush(pList, &bi); } diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index b3a8e09f16..86c2ad4f21 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -25,7 +25,11 @@ #include "thash.h" #include "ttypes.h" -static void destroyGroupbyOperatorInfo(void* param, int32_t numOfOutput) { +static int32_t* setupColumnOffset(const SSDataBlock* pBlock, int32_t rowCapacity); +static void* getCurrentDataGroupInfo(const SPartitionOperatorInfo* pInfo, SDataGroupInfo** pGroupInfo, int32_t len); +static uint64_t calcGroupId(char* pData, int32_t len); + +static void destroyGroupOperatorInfo(void* param, int32_t numOfOutput) { SGroupbyOperatorInfo* pInfo = (SGroupbyOperatorInfo*)param; doDestroyBasicInfo(&pInfo->binfo, numOfOutput); taosMemoryFreeClear(pInfo->keyBuf); @@ -33,44 +37,43 @@ static void destroyGroupbyOperatorInfo(void* param, int32_t numOfOutput) { taosArrayDestroy(pInfo->pGroupColVals); } -static int32_t initGroupOptrInfo(SGroupbyOperatorInfo* pInfo, SArray* pGroupColList) { - pInfo->pGroupColVals = taosArrayInit(4, sizeof(SGroupKeys)); - if (pInfo->pGroupColVals == NULL) { +static int32_t initGroupOptrInfo(SArray** pGroupColVals, int32_t* keyLen, char** keyBuf, const SArray* pGroupColList) { + *pGroupColVals = taosArrayInit(4, sizeof(SGroupKeys)); + if ((*pGroupColVals) == NULL) { return TSDB_CODE_OUT_OF_MEMORY; } int32_t numOfGroupCols = taosArrayGetSize(pGroupColList); for (int32_t i = 0; i < numOfGroupCols; ++i) { SColumn* pCol = taosArrayGet(pGroupColList, i); - pInfo->groupKeyLen += pCol->bytes; + (*keyLen) += pCol->bytes; struct SGroupKeys key = {0}; - key.bytes = pCol->bytes; - key.type = pCol->type; + key.bytes = pCol->bytes; + key.type = pCol->type; key.isNull = false; - key.pData = taosMemoryCalloc(1, pCol->bytes); + key.pData = taosMemoryCalloc(1, pCol->bytes); if (key.pData == NULL) { return TSDB_CODE_OUT_OF_MEMORY; } - taosArrayPush(pInfo->pGroupColVals, &key); + taosArrayPush((*pGroupColVals), &key); } int32_t nullFlagSize = sizeof(int8_t) * numOfGroupCols; - pInfo->keyBuf = taosMemoryCalloc(1, pInfo->groupKeyLen + nullFlagSize); - if (pInfo->keyBuf == NULL) { + (*keyBuf) = taosMemoryCalloc(1, (*keyLen) + nullFlagSize); + if ((*keyBuf) == NULL) { return TSDB_CODE_OUT_OF_MEMORY; } return TSDB_CODE_SUCCESS; } -static bool groupKeyCompare(SGroupbyOperatorInfo* pInfo, SSDataBlock* pBlock, int32_t rowIndex, - int32_t numOfGroupCols) { +static bool groupKeyCompare(SArray* pGroupCols, SArray* pGroupColVals, SSDataBlock* pBlock, int32_t rowIndex, int32_t numOfGroupCols) { SColumnDataAgg* pColAgg = NULL; for (int32_t i = 0; i < numOfGroupCols; ++i) { - SColumn* pCol = taosArrayGet(pInfo->pGroupCols, i); + SColumn* pCol = taosArrayGet(pGroupCols, i); SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pCol->slotId); if (pBlock->pBlockAgg != NULL) { pColAgg = &pBlock->pBlockAgg[pCol->slotId]; // TODO is agg data matched? @@ -78,7 +81,7 @@ static bool groupKeyCompare(SGroupbyOperatorInfo* pInfo, SSDataBlock* pBlock, in bool isNull = colDataIsNull(pColInfoData, pBlock->info.rows, rowIndex, pColAgg); - SGroupKeys* pkey = taosArrayGet(pInfo->pGroupColVals, i); + SGroupKeys* pkey = taosArrayGet(pGroupColVals, i); if (pkey->isNull && isNull) { continue; } @@ -106,18 +109,18 @@ static bool groupKeyCompare(SGroupbyOperatorInfo* pInfo, SSDataBlock* pBlock, in return true; } -static void recordNewGroupKeys(SGroupbyOperatorInfo* pInfo, SSDataBlock* pBlock, int32_t rowIndex, int32_t numOfGroupCols) { +static void recordNewGroupKeys(SArray* pGroupCols, SArray* pGroupColVals, SSDataBlock* pBlock, int32_t rowIndex, int32_t numOfGroupCols) { SColumnDataAgg* pColAgg = NULL; for (int32_t i = 0; i < numOfGroupCols; ++i) { - SColumn* pCol = taosArrayGet(pInfo->pGroupCols, i); + SColumn* pCol = taosArrayGet(pGroupCols, i); SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pCol->slotId); if (pBlock->pBlockAgg != NULL) { pColAgg = &pBlock->pBlockAgg[pCol->slotId]; // TODO is agg data matched? } - SGroupKeys* pkey = taosArrayGet(pInfo->pGroupColVals, i); + SGroupKeys* pkey = taosArrayGet(pGroupColVals, i); if (colDataIsNull(pColInfoData, pBlock->info.rows, rowIndex, pColAgg)) { pkey->isNull = true; } else { @@ -197,13 +200,13 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) { for (int32_t j = 0; j < pBlock->info.rows; ++j) { // Compare with the previous row of this column, and do not set the output buffer again if they are identical. if (!pInfo->isInit) { - recordNewGroupKeys(pInfo, pBlock, j, numOfGroupCols); + recordNewGroupKeys(pInfo->pGroupCols, pInfo->pGroupColVals, pBlock, j, numOfGroupCols); pInfo->isInit = true; num++; continue; } - bool equal = groupKeyCompare(pInfo, pBlock, j, numOfGroupCols); + bool equal = groupKeyCompare(pInfo->pGroupCols, pInfo->pGroupColVals, pBlock, j, numOfGroupCols); if (equal) { num++; continue; @@ -212,7 +215,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) { // The first row of a new block does not belongs to the previous existed group if (!equal && j == 0) { num++; - recordNewGroupKeys(pInfo, pBlock, j, numOfGroupCols); + recordNewGroupKeys(pInfo->pGroupCols, pInfo->pGroupColVals, pBlock, j, numOfGroupCols); continue; } @@ -227,7 +230,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) { // assign the group keys or user input constant values if required doAssignGroupKeys(pCtx, pOperator->numOfOutput, pBlock->info.rows, rowIndex); - recordNewGroupKeys(pInfo, pBlock, j, numOfGroupCols); + recordNewGroupKeys(pInfo->pGroupCols, pInfo->pGroupColVals, pBlock, j, numOfGroupCols); num = 1; } @@ -259,7 +262,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator, bool* newgrou if (pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) { pOperator->status = OP_EXEC_DONE; } - return pRes; + return (pRes->info.rows == 0)? NULL:pRes; } int32_t order = TSDB_ORDER_ASC; @@ -309,7 +312,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator, bool* newgrou } } - return pInfo->binfo.pRes; + return (pRes->info.rows == 0)? NULL:pRes; } SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SArray* pGroupColList, SNode* pCondition, SExecTaskInfo* pTaskInfo, @@ -325,7 +328,7 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, 4096, pResultBlock, pTaskInfo->id.str); initResultRowInfo(&pInfo->binfo.resultRowInfo, 8); - int32_t code = initGroupOptrInfo(pInfo, pGroupColList); + int32_t code = initGroupOptrInfo(&pInfo->pGroupColVals, &pInfo->groupKeyLen, &pInfo->keyBuf, pGroupColList); if (code != TSDB_CODE_SUCCESS) { goto _error; } @@ -339,7 +342,7 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx pOperator->info = pInfo; pOperator->_openFn = operatorDummyOpenFn; pOperator->getNextFn = hashGroupbyAggregate; - pOperator->closeFn = destroyGroupbyOperatorInfo; + pOperator->closeFn = destroyGroupOperatorInfo; code = appendDownstream(pOperator, &downstream, 1); return pOperator; @@ -351,67 +354,263 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx return NULL; } -static SSDataBlock* doPartitionData(SOperatorInfo* pOperator, bool* newgroup) { +static void doHashPartition(SOperatorInfo* pOperator, SSDataBlock* pBlock) { +// SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + + SPartitionOperatorInfo* pInfo = pOperator->info; + + int32_t numOfGroupCols = taosArrayGetSize(pInfo->pGroupCols); + for (int32_t j = 0; j < pBlock->info.rows; ++j) { + recordNewGroupKeys(pInfo->pGroupCols, pInfo->pGroupColVals, pBlock, j, numOfGroupCols); + int32_t len = buildGroupKeys(pInfo->keyBuf, pInfo->pGroupColVals); + + SDataGroupInfo* pGInfo = NULL; + void *pPage = getCurrentDataGroupInfo(pInfo, &pGInfo, len); + + pGInfo->numOfRows += 1; + if (pGInfo->groupId == 0) { + pGInfo->groupId = calcGroupId(pInfo->keyBuf, len); + } + + int32_t* rows = (int32_t*) pPage; + + size_t numOfCols = pOperator->numOfOutput; + for(int32_t i = 0; i < numOfCols; ++i) { + SExprInfo* pExpr = &pOperator->pExpr[i]; + int32_t slotId = pExpr->base.pParam[0].pCol->slotId; + + SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId); + + int32_t bytes = pColInfoData->info.bytes; + int32_t startOffset = pInfo->columnOffset[i]; + + char* columnLen = NULL; + int32_t contentLen = 0; + + if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) { + int32_t* offset = pPage + startOffset; + columnLen = pPage + startOffset + sizeof(int32_t) * pInfo->rowCapacity; + char* data = (char*)(columnLen + sizeof(int32_t)); + + if (colDataIsNull_s(pColInfoData, j)) { + offset[(*rows)] = -1; + contentLen = 0; + } else { + offset[*rows] = (*columnLen); + char* src = colDataGetData(pColInfoData, j); + memcpy(data + (*columnLen), src, varDataTLen(src)); + contentLen = varDataTLen(src); + } + } else { + char* bitmap = pPage + startOffset; + columnLen = pPage + startOffset + BitmapLen(pInfo->rowCapacity); + char* data = (char*) columnLen + sizeof(int32_t); + + bool isNull = colDataIsNull_f(pColInfoData->nullbitmap, j); + if (isNull) { + colDataSetNull_f(bitmap, (*rows)); + } else { + memcpy(data + (*columnLen), colDataGetData(pColInfoData, j), bytes); + } + contentLen = bytes; + } + + (*columnLen) += contentLen; + } + + (*rows) += 1; + + setBufPageDirty(pPage, true); + releaseBufPage(pInfo->pBuf, pPage); + } +} + +void* getCurrentDataGroupInfo(const SPartitionOperatorInfo* pInfo, SDataGroupInfo** pGroupInfo, int32_t len) { + SDataGroupInfo* p = taosHashGet(pInfo->pGroupSet, pInfo->keyBuf, len); + + void* pPage = NULL; + if (p == NULL) { // it is a new group + SDataGroupInfo gi = {0}; + gi.pPageList = taosArrayInit(100, sizeof(int32_t)); + taosHashPut(pInfo->pGroupSet, pInfo->keyBuf, len, &gi, sizeof(SDataGroupInfo)); + + p = taosHashGet(pInfo->pGroupSet, pInfo->keyBuf, len); + + int32_t pageId = 0; + pPage = getNewBufPage(pInfo->pBuf, 0, &pageId); + taosArrayPush(p->pPageList, &pageId); + + *(int32_t *) pPage = 0; + } else { + int32_t* curId = taosArrayGetLast(p->pPageList); + pPage = getBufPage(pInfo->pBuf, *curId); + + int32_t *rows = (int32_t*) pPage; + if (*rows >= pInfo->rowCapacity) { + // add a new page for current group + int32_t pageId = 0; + pPage = getNewBufPage(pInfo->pBuf, 0, &pageId); + taosArrayPush(p->pPageList, &pageId); + + *(int32_t*) pPage = 0; + } + } + + *pGroupInfo = p; + return pPage; +} + +uint64_t calcGroupId(char* pData, int32_t len) { + T_MD5_CTX context; + tMD5Init(&context); + tMD5Update(&context, (uint8_t*)pData, len); + tMD5Final(&context); + + // NOTE: only extract the initial 8 bytes of the final MD5 digest + uint64_t id = 0; + memcpy(&id, context.digest, sizeof(uint64_t)); + return id; +} + +int32_t* setupColumnOffset(const SSDataBlock* pBlock, int32_t rowCapacity) { + size_t numOfCols = pBlock->info.numOfCols; + int32_t* offset = taosMemoryCalloc(pBlock->info.numOfCols, sizeof(int32_t)); + + offset[0] = sizeof(int32_t); // the number of rows in current page, ref to SSDataBlock paged serialization format + + for(int32_t i = 0; i < numOfCols - 1; ++i) { + SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i); + + int32_t bytes = pColInfoData->info.bytes; + int32_t payloadLen = bytes * rowCapacity; + + if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) { + // offset segment + content length + payload + offset[i + 1] = rowCapacity * sizeof(int32_t) + sizeof(int32_t) + payloadLen + offset[i]; + } else { + // bitmap + content length + payload + offset[i + 1] = BitmapLen(rowCapacity) + sizeof(int32_t) + payloadLen + offset[i]; + } + } + + return offset; +} + +static SSDataBlock* buildPartitionResult(SOperatorInfo* pOperator) { + SPartitionOperatorInfo* pInfo = pOperator->info; + + SDataGroupInfo* pGroupInfo = pInfo->pGroupIter; + if (pInfo->pGroupIter == NULL || pInfo->pageIndex >= taosArrayGetSize(pGroupInfo->pPageList)) { + // try next group data + pInfo->pGroupIter = taosHashIterate(pInfo->pGroupSet, pInfo->pGroupIter); + if (pInfo->pGroupIter == NULL) { + pOperator->status = OP_EXEC_DONE; + return NULL; + } + + pGroupInfo = pInfo->pGroupIter; + pInfo->pageIndex = 0; + } + + int32_t* pageId = taosArrayGet(pGroupInfo->pPageList, pInfo->pageIndex); + void* page = getBufPage(pInfo->pBuf, *pageId); + + blockDataFromBuf1(pInfo->binfo.pRes, page, pInfo->rowCapacity); + + pInfo->pageIndex += 1; + + pInfo->binfo.pRes->info.groupId = pGroupInfo->groupId; + return pInfo->binfo.pRes; +} + +static SSDataBlock* hashPartition(SOperatorInfo* pOperator, bool* newgroup) { if (pOperator->status == OP_EXEC_DONE) { return NULL; } - SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - SSortOperatorInfo* pInfo = pOperator->info; - bool hasVarCol = pInfo->pDataBlock->info.hasVarCol; + SGroupbyOperatorInfo* pInfo = pOperator->info; + SSDataBlock* pRes = pInfo->binfo.pRes; if (pOperator->status == OP_RES_TO_RETURN) { - return getSortedBlockData(pInfo->pSortHandle, pInfo->pDataBlock, pInfo->numOfRowsInRes); + blockDataCleanup(pRes); + return buildPartitionResult(pOperator); } - int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize; - pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_SINGLESOURCE_SORT, pInfo->bufPageSize, numOfBufPage, - pInfo->pDataBlock, pTaskInfo->id.str); + SOperatorInfo* downstream = pOperator->pDownstream[0]; - tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock); + while (1) { + publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); + SSDataBlock* pBlock = downstream->getNextFn(downstream, newgroup); + publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC); + if (pBlock == NULL) { + break; + } - SGenericSource* ps = taosMemoryCalloc(1, sizeof(SGenericSource)); - ps->param = pOperator->pDownstream[0]; - tsortAddSource(pInfo->pSortHandle, ps); - - int32_t code = tsortOpen(pInfo->pSortHandle); - if (code != TSDB_CODE_SUCCESS) { - longjmp(pTaskInfo->env, terrno); + // setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->binfo.pCtx, pOperator->numOfOutput); + doHashPartition(pOperator, pBlock); } pOperator->status = OP_RES_TO_RETURN; - return getSortedBlockData(pInfo->pSortHandle, pInfo->pDataBlock, pInfo->numOfRowsInRes); + blockDataEnsureCapacity(pRes, 4096); + return buildPartitionResult(pOperator); } -SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pResultBlock, SArray* pSortInfo, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo) { - SSortOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSortOperatorInfo)); - SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); +static void destroyPartitionOperatorInfo(void* param, int32_t numOfOutput) { + SPartitionOperatorInfo* pInfo = (SPartitionOperatorInfo*)param; + doDestroyBasicInfo(&pInfo->binfo, numOfOutput); + taosArrayDestroy(pInfo->pGroupCols); + taosArrayDestroy(pInfo->pGroupColVals); + taosMemoryFree(pInfo->keyBuf); + taosMemoryFree(pInfo->columnOffset); +} + +SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SArray* pGroupColList, + SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo) { + SPartitionOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SPartitionOperatorInfo)); + SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { goto _error; } - pInfo->sortBufSize = 1024 * 16; // TODO dynamic set the available sort buffer - pInfo->bufPageSize = 1024; - pInfo->numOfRowsInRes = 1024; - pInfo->pDataBlock = pResultBlock; - pInfo->pSortInfo = pSortInfo; + pInfo->pGroupCols = pGroupColList; + + _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); + pInfo->pGroupSet = taosHashInit(100, hashFn, false, HASH_NO_LOCK); + if (pInfo->pGroupSet == NULL) { + goto _error; + } + + int32_t code = createDiskbasedBuf(&pInfo->pBuf, 4096, 4096 * 256, pTaskInfo->id.str, "/tmp/"); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + + pInfo->rowCapacity = blockDataGetCapacityInRow(pResultBlock, getBufPageSize(pInfo->pBuf)); + pInfo->columnOffset = setupColumnOffset(pResultBlock, pInfo->rowCapacity); + code = initGroupOptrInfo(&pInfo->pGroupColVals, &pInfo->groupKeyLen, &pInfo->keyBuf, pGroupColList); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } pOperator->name = "PartitionOperator"; - pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_PARTITION; pOperator->blockingOptr = true; pOperator->status = OP_NOT_OPENED; + pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_PARTITION; + + pInfo->binfo.pRes = pResultBlock; + pOperator->numOfOutput = numOfCols; + pOperator->pExpr = pExprInfo; pOperator->info = pInfo; + pOperator->_openFn = operatorDummyOpenFn; + pOperator->getNextFn = hashPartition; + pOperator->closeFn = destroyPartitionOperatorInfo; - pOperator->pTaskInfo = pTaskInfo; - pOperator->getNextFn = doPartitionData; -// pOperator->closeFn = destroyOrderOperatorInfo; - - int32_t code = appendDownstream(pOperator, &downstream, 1); + code = appendDownstream(pOperator, &downstream, 1); return pOperator; _error: pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY; - taosMemoryFree(pInfo); - taosMemoryFree(pOperator); + taosMemoryFreeClear(pInfo); + taosMemoryFreeClear(pOperator); return NULL; } \ No newline at end of file diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 7181fd0991..c67f833699 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -255,6 +255,11 @@ SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, pOperator->getNextFn = doTableScan; pOperator->pTaskInfo = pTaskInfo; + static int32_t cost = 0; + pOperator->cost.openCost = ++cost; + pOperator->cost.totalCost = ++cost; + pOperator->resultInfo.totalRows = ++cost; + return pOperator; } diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index 7a57d62969..5dfda92982 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -425,9 +425,7 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) { qDebug("%s %d rounds mergesort required to complete the sort, first-round sorted data size:%"PRIzu", sort:%"PRId64", total elapsed:%"PRId64, pHandle->idStr, (int32_t) (sortPass + 1), getTotalBufSize(pHandle->pBuf), pHandle->sortElapsed, pHandle->totalElapsed); - size_t pgSize = pHandle->pageSize; - int32_t numOfRows = (pgSize - blockDataGetSerialMetaSize(pHandle->pDataBlock))/ blockDataGetSerialRowSize(pHandle->pDataBlock); - + int32_t numOfRows = blockDataGetCapacityInRow(pHandle->pDataBlock, pHandle->pageSize); blockDataEnsureCapacity(pHandle->pDataBlock, numOfRows); size_t numOfSorted = taosArrayGetSize(pHandle->pOrderedSource); diff --git a/source/libs/nodes/src/nodesToSQLFuncs.c b/source/libs/nodes/src/nodesToSQLFuncs.c index 68f38bb6a7..b8e071ec0d 100644 --- a/source/libs/nodes/src/nodesToSQLFuncs.c +++ b/source/libs/nodes/src/nodesToSQLFuncs.c @@ -21,7 +21,7 @@ #include "taoserror.h" #include "thash.h" -char *gOperatorStr[] = {NULL, "+", "-", "*", "/", "%", "&", "|", ">", ">=", "<", "<=", "=", "<>", +char *gOperatorStr[] = {NULL, "+", "-", "*", "/", "%", "-", "&", "|", ">", ">=", "<", "<=", "=", "<>", "IN", "NOT IN", "LIKE", "NOT LIKE", "MATCH", "NMATCH", "IS NULL", "IS NOT NULL", "IS TRUE", "IS FALSE", "IS UNKNOWN", "IS NOT TRUE", "IS NOT FALSE", "IS NOT UNKNOWN"}; char *gLogicConditionStr[] = {"AND", "OR", "NOT"}; diff --git a/source/libs/nodes/src/nodesUtilFuncs.c b/source/libs/nodes/src/nodesUtilFuncs.c index 95a3280fc2..074ec4dcae 100644 --- a/source/libs/nodes/src/nodesUtilFuncs.c +++ b/source/libs/nodes/src/nodesUtilFuncs.c @@ -644,7 +644,7 @@ SNodeList* nodesMakeList() { int32_t nodesListAppend(SNodeList* pList, SNodeptr pNode) { if (NULL == pList || NULL == pNode) { - return TSDB_CODE_SUCCESS; + return TSDB_CODE_FAILED; } SListCell* p = taosMemoryCalloc(1, sizeof(SListCell)); if (NULL == p) { @@ -688,7 +688,7 @@ int32_t nodesListMakeAppend(SNodeList** pList, SNodeptr pNode) { int32_t nodesListAppendList(SNodeList* pTarget, SNodeList* pSrc) { if (NULL == pTarget || NULL == pSrc) { - return TSDB_CODE_SUCCESS; + return TSDB_CODE_FAILED; } if (NULL == pTarget->pHead) { @@ -717,11 +717,34 @@ int32_t nodesListStrictAppendList(SNodeList* pTarget, SNodeList* pSrc) { return code; } +int32_t nodesListPushFront(SNodeList* pList, SNodeptr pNode) { + if (NULL == pList || NULL == pNode) { + return TSDB_CODE_FAILED; + } + SListCell* p = taosMemoryCalloc(1, sizeof(SListCell)); + if (NULL == p) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return TSDB_CODE_OUT_OF_MEMORY; + } + p->pNode = pNode; + if (NULL != pList->pHead) { + pList->pHead->pPrev = p; + p->pNext = pList->pHead; + } + pList->pHead = p; + ++(pList->length); + return TSDB_CODE_SUCCESS; +} + SListCell* nodesListErase(SNodeList* pList, SListCell* pCell) { if (NULL == pCell->pPrev) { pList->pHead = pCell->pNext; } else { pCell->pPrev->pNext = pCell->pNext; + } + if (NULL == pCell->pNext) { + pList->pTail = pCell->pPrev; + } else { pCell->pNext->pPrev = pCell->pPrev; } SListCell* pNext = pCell->pNext; @@ -731,6 +754,24 @@ SListCell* nodesListErase(SNodeList* pList, SListCell* pCell) { return pNext; } +void nodesListInsertList(SNodeList* pTarget, SListCell* pPos, SNodeList* pSrc) { + if (NULL == pTarget || NULL == pPos || NULL == pSrc) { + return; + } + + if (NULL == pPos->pPrev) { + pTarget->pHead = pSrc->pHead; + } else { + pPos->pPrev->pNext = pSrc->pHead; + } + pSrc->pHead->pPrev = pPos->pPrev; + pSrc->pTail->pNext = pPos; + pPos->pPrev = pSrc->pTail; + + pTarget->length += pSrc->length; + taosMemoryFreeClear(pSrc); +} + SNodeptr nodesListGetNode(SNodeList* pList, int32_t index) { SNode* node; FOREACH(node, pList) { diff --git a/source/libs/parser/src/parCalcConst.c b/source/libs/parser/src/parCalcConst.c index d21628956a..0b4f79d3ff 100644 --- a/source/libs/parser/src/parCalcConst.c +++ b/source/libs/parser/src/parCalcConst.c @@ -104,6 +104,8 @@ static int32_t rewriteIsTrue(SNode* pSrc, SNode** pIsTrue) { } pOp->opType = OP_TYPE_IS_TRUE; pOp->pLeft = pSrc; + pOp->node.resType.type = TSDB_DATA_TYPE_BOOL; + pOp->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_BOOL].bytes; *pIsTrue = (SNode*)pOp; return TSDB_CODE_SUCCESS; } @@ -198,6 +200,8 @@ static int32_t calcConstQuery(SNode* pStmt) { switch (nodeType(pStmt)) { case QUERY_NODE_SELECT_STMT: return calcConstSelect((SSelectStmt*)pStmt); + case QUERY_NODE_EXPLAIN_STMT: + return calcConstQuery(((SExplainStmt*)pStmt)->pQuery); default: break; } diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 88c3d5b549..ebe5b639c4 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -743,24 +743,99 @@ static int32_t translateTable(STranslateContext* pCxt, SNode* pTable) { return code; } -static int32_t translateStar(STranslateContext* pCxt, SSelectStmt* pSelect, bool* pIsSelectStar) { +static int32_t createAllColumns(STranslateContext* pCxt, SNodeList** pCols) { + *pCols = nodesMakeList(); + if (NULL == *pCols) { + return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_OUT_OF_MEMORY); + } + SArray* pTables = taosArrayGetP(pCxt->pNsLevel, pCxt->currLevel); + size_t nums = taosArrayGetSize(pTables); + for (size_t i = 0; i < nums; ++i) { + STableNode* pTable = taosArrayGetP(pTables, i); + int32_t code = createColumnNodeByTable(pCxt, pTable, *pCols); + if (TSDB_CODE_SUCCESS != code) { + return code; + } + } + return TSDB_CODE_SUCCESS; +} + +static bool isFirstLastFunc(SFunctionNode* pFunc) { + return (FUNCTION_TYPE_FIRST == pFunc->funcType || FUNCTION_TYPE_LAST == pFunc->funcType); +} + +static bool isFirstLastStar(SNode* pNode) { + if (QUERY_NODE_FUNCTION != nodeType(pNode) || !isFirstLastFunc((SFunctionNode*)pNode)) { + return false; + } + SNodeList* pParameterList = ((SFunctionNode*)pNode)->pParameterList; + if (LIST_LENGTH(pParameterList) != 1) { + return false; + } + SNode* pParam = nodesListGetNode(pParameterList, 0); + return (QUERY_NODE_COLUMN == nodeType(pParam) ? 0 == strcmp(((SColumnNode*)pParam)->colName, "*") : false); +} + +static SNode* createFirstLastFunc(SFunctionNode* pSrcFunc, SColumnNode* pCol) { + SFunctionNode* pFunc = nodesMakeNode(QUERY_NODE_FUNCTION); + if (NULL == pFunc) { + return NULL; + } + pFunc->pParameterList = nodesMakeList(); + if (NULL == pFunc->pParameterList || TSDB_CODE_SUCCESS != nodesListAppend(pFunc->pParameterList, pCol)) { + nodesDestroyNode(pFunc); + return NULL; + } + + pFunc->node.resType = pCol->node.resType; + pFunc->funcId = pSrcFunc->funcId; + pFunc->funcType = pSrcFunc->funcType; + strcpy(pFunc->functionName, pSrcFunc->functionName); + snprintf(pFunc->node.aliasName, sizeof(pFunc->node.aliasName), (FUNCTION_TYPE_FIRST == pSrcFunc->funcType ? "first(%s)" : "last(%s)"), pCol->colName); + + return (SNode*)pFunc; +} + +static int32_t createFirstLastAllCols(STranslateContext* pCxt, SFunctionNode* pSrcFunc, SNodeList** pOutput) { + SNodeList* pCols = NULL; + if (TSDB_CODE_SUCCESS != createAllColumns(pCxt, &pCols)) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + SNodeList* pFuncs = nodesMakeList(); + if (NULL == pFuncs) { + return TSDB_CODE_OUT_OF_MEMORY; + } + SNode* pCol = NULL; + FOREACH(pCol, pCols) { + if (TSDB_CODE_SUCCESS != nodesListStrictAppend(pFuncs, createFirstLastFunc(pSrcFunc, (SColumnNode*)pCol))) { + nodesDestroyNode(pFuncs); + return TSDB_CODE_OUT_OF_MEMORY; + } + } + + *pOutput = pFuncs; + return TSDB_CODE_SUCCESS; +} + +static int32_t translateStar(STranslateContext* pCxt, SSelectStmt* pSelect) { if (NULL == pSelect->pProjectionList) { // select * ... - SArray* pTables = taosArrayGetP(pCxt->pNsLevel, pCxt->currLevel); - size_t nums = taosArrayGetSize(pTables); - pSelect->pProjectionList = nodesMakeList(); - if (NULL == pSelect->pProjectionList) { - return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_OUT_OF_MEMORY); - } - for (size_t i = 0; i < nums; ++i) { - STableNode* pTable = taosArrayGetP(pTables, i); - int32_t code = createColumnNodeByTable(pCxt, pTable, pSelect->pProjectionList); - if (TSDB_CODE_SUCCESS != code) { - return code; - } - } - *pIsSelectStar = true; + return createAllColumns(pCxt, &pSelect->pProjectionList); } else { // todo : t.* + SNode* pNode = NULL; + WHERE_EACH(pNode, pSelect->pProjectionList) { + if (isFirstLastStar(pNode)) { + SNodeList* pFuncs = NULL; + if (TSDB_CODE_SUCCESS != createFirstLastAllCols(pCxt, (SFunctionNode*)pNode, &pFuncs)) { + return TSDB_CODE_OUT_OF_MEMORY; + } + INSERT_LIST(pSelect->pProjectionList, pFuncs); + ERASE_NODE(pSelect->pProjectionList); + continue; + } + WHERE_NEXT; + } } return TSDB_CODE_SUCCESS; } @@ -797,8 +872,8 @@ static int32_t getPositionValue(const SValueNode* pVal) { static int32_t translateOrderByPosition(STranslateContext* pCxt, SNodeList* pProjectionList, SNodeList* pOrderByList, bool* pOther) { *pOther = false; - SNode* pNode; - FOREACH(pNode, pOrderByList) { + SNode* pNode = NULL; + WHERE_EACH(pNode, pOrderByList) { SNode* pExpr = ((SOrderByExprNode*)pNode)->pExpr; if (QUERY_NODE_VALUE == nodeType(pExpr)) { SValueNode* pVal = (SValueNode*)pExpr; @@ -823,6 +898,7 @@ static int32_t translateOrderByPosition(STranslateContext* pCxt, SNodeList* pPro } else { *pOther = true; } + WHERE_NEXT; } return TSDB_CODE_SUCCESS; } @@ -845,11 +921,10 @@ static int32_t translateOrderBy(STranslateContext* pCxt, SSelectStmt* pSelect) { } static int32_t translateSelectList(STranslateContext* pCxt, SSelectStmt* pSelect) { - bool isSelectStar = false; - int32_t code = translateStar(pCxt, pSelect, &isSelectStar); - if (TSDB_CODE_SUCCESS == code && !isSelectStar) { - pCxt->currClause = SQL_CLAUSE_SELECT; - code = translateExprList(pCxt, pSelect->pProjectionList); + pCxt->currClause = SQL_CLAUSE_SELECT; + int32_t code = translateExprList(pCxt, pSelect->pProjectionList); + if (TSDB_CODE_SUCCESS == code) { + code = translateStar(pCxt, pSelect); } if (TSDB_CODE_SUCCESS == code) { code = checkExprListForGroupBy(pCxt, pSelect->pProjectionList); @@ -1008,6 +1083,8 @@ static int32_t buildCreateDbReq(STranslateContext* pCxt, SCreateDatabaseStmt* pS pReq->cacheLastRow = GET_OPTION_VAL(pStmt->pOptions->pCachelast, TSDB_DEFAULT_CACHE_LAST_ROW); pReq->ignoreExist = pStmt->ignoreExists; pReq->streamMode = GET_OPTION_VAL(pStmt->pOptions->pStreamMode, TSDB_DEFAULT_DB_STREAM_MODE_OPTION); + pReq->ttl = GET_OPTION_VAL(pStmt->pOptions->pTtl, TSDB_DEFAULT_DB_TTL_OPTION); + pReq->singleSTable = GET_OPTION_VAL(pStmt->pOptions->pSingleStable, TSDB_DEFAULT_DB_SINGLE_STABLE_OPTION); return buildCreateDbRetentions(pStmt->pOptions->pRetentions, pReq); } @@ -1829,10 +1906,13 @@ static int32_t getSmaIndexBuildAst(STranslateContext* pCxt, SCreateIndexStmt* pS pSelect->pFromTable = (SNode*)pTable; pSelect->pProjectionList = nodesCloneList(pStmt->pOptions->pFuncs); - if (NULL == pTable) { + SFunctionNode* pFunc = nodesMakeNode(QUERY_NODE_FUNCTION); + if (NULL == pSelect->pProjectionList || NULL == pFunc) { nodesDestroyNode(pSelect); return TSDB_CODE_OUT_OF_MEMORY; } + strcpy(pFunc->functionName, "_wstartts"); + nodesListPushFront(pSelect->pProjectionList, pFunc); SNode* pProject = NULL; FOREACH(pProject, pSelect->pProjectionList) { sprintf(((SExprNode*)pProject)->aliasName, "#sma_%p", pProject); diff --git a/source/libs/qcom/src/querymsg.c b/source/libs/qcom/src/querymsg.c index 1e91c55dc0..d211e780b0 100644 --- a/source/libs/qcom/src/querymsg.c +++ b/source/libs/qcom/src/querymsg.c @@ -121,6 +121,23 @@ int32_t queryBuildQnodeListMsg(void *input, char **msg, int32_t msgSize, int32_t return TSDB_CODE_SUCCESS; } +int32_t queryBuildGetDBCfgMsg(void *input, char **msg, int32_t msgSize, int32_t *msgLen) { + if (NULL == msg || NULL == msgLen) { + return TSDB_CODE_TSC_INVALID_INPUT; + } + + SDbCfgReq dbCfgReq = {0}; + strcpy(dbCfgReq.db, input); + + int32_t bufLen = tSerializeSDbCfgReq(NULL, 0, &dbCfgReq); + void *pBuf = rpcMallocCont(bufLen); + tSerializeSDbCfgReq(pBuf, bufLen, &dbCfgReq); + + *msg = pBuf; + *msgLen = bufLen; + + return TSDB_CODE_SUCCESS; +} int32_t queryProcessUseDBRsp(void *output, char *msg, int32_t msgSize) { SUseDbOutput *pOut = output; @@ -309,17 +326,36 @@ PROCESS_QLIST_OVER: return code; } +int32_t queryProcessGetDbCfgRsp(void *output, char *msg, int32_t msgSize) { + SDbCfgRsp out = {0}; + + if (NULL == output || NULL == msg || msgSize <= 0) { + return TSDB_CODE_TSC_INVALID_INPUT; + } + + if (tDeserializeSDbCfgRsp(msg, msgSize, &out) != 0) { + qError("tDeserializeSDbCfgRsp failed, msgSize:%d", msgSize); + return TSDB_CODE_INVALID_MSG; + } + + memcpy(output, &out, sizeof(out)); + + return TSDB_CODE_SUCCESS; +} + void initQueryModuleMsgHandle() { queryBuildMsg[TMSG_INDEX(TDMT_VND_TABLE_META)] = queryBuildTableMetaReqMsg; queryBuildMsg[TMSG_INDEX(TDMT_MND_TABLE_META)] = queryBuildTableMetaReqMsg; - queryBuildMsg[TMSG_INDEX(TDMT_MND_USE_DB)] = queryBuildUseDbMsg; + queryBuildMsg[TMSG_INDEX(TDMT_MND_USE_DB)] = queryBuildUseDbMsg; queryBuildMsg[TMSG_INDEX(TDMT_MND_QNODE_LIST)] = queryBuildQnodeListMsg; + queryBuildMsg[TMSG_INDEX(TDMT_MND_GET_DB_CFG)] = queryBuildGetDBCfgMsg; queryProcessMsgRsp[TMSG_INDEX(TDMT_VND_TABLE_META)] = queryProcessTableMetaRsp; queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_TABLE_META)] = queryProcessTableMetaRsp; - queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_USE_DB)] = queryProcessUseDBRsp; + queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_USE_DB)] = queryProcessUseDBRsp; queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_QNODE_LIST)] = queryProcessQnodeListRsp; + queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_GET_DB_CFG)] = queryProcessGetDbCfgRsp; } #pragma GCC diagnostic pop diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index a83fcc26f2..f0f04a8a9b 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -977,10 +977,10 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType, int8_t ex QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); } - QW_ERR_JRET(qwBuildAndSendQueryRsp(&qwMsg->connInfo, code)); - QW_TASK_DLOG("query msg rsped, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code)); + //QW_ERR_JRET(qwBuildAndSendQueryRsp(&qwMsg->connInfo, code)); + //QW_TASK_DLOG("query msg rsped, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code)); - queryRsped = true; + //queryRsped = true; atomic_store_ptr(&ctx->taskHandle, pTaskInfo); atomic_store_ptr(&ctx->sinkHandle, sinkHandle); @@ -994,10 +994,10 @@ _return: input.code = code; code = qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_QUERY, &input, NULL); - if (!queryRsped) { - qwBuildAndSendQueryRsp(&qwMsg->connInfo, code); - QW_TASK_DLOG("query msg rsped, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code)); - } + //if (!queryRsped) { + // qwBuildAndSendQueryRsp(&qwMsg->connInfo, code); + // QW_TASK_DLOG("query msg rsped, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code)); + //} QW_RET(TSDB_CODE_SUCCESS); } diff --git a/source/libs/scalar/src/filter.c b/source/libs/scalar/src/filter.c index 393f143e5f..16236fc05b 100644 --- a/source/libs/scalar/src/filter.c +++ b/source/libs/scalar/src/filter.c @@ -687,11 +687,15 @@ int32_t filterGetRangeRes(void* h, SFilterRange *ra) { SFilterRangeNode* r = ctx->rs; while (r) { - FILTER_COPY_RA(ra, &r->ra); + if (num) { + ra->e = r->ra.e; + ra->eflag = r->ra.eflag; + } else { + FILTER_COPY_RA(ra, &r->ra); + } ++num; r = r->next; - ++ra; } if (num == 0) { @@ -3314,8 +3318,7 @@ bool filterRangeExecute(SFilterInfo *info, SColumnDataAgg *pDataStatis, int32_t -int32_t filterGetTimeRange(SNode *pNode, STimeWindow *win, bool *isStrict) { - SFilterInfo *info = NULL; +int32_t filterGetTimeRangeImpl(SFilterInfo *info, STimeWindow *win, bool *isStrict) { SFilterRange ra = {0}; SFilterRangeCtx *prev = filterInitRangeCtx(TSDB_DATA_TYPE_TIMESTAMP, FLT_OPTION_TIMESTAMP); SFilterRangeCtx *tmpc = filterInitRangeCtx(TSDB_DATA_TYPE_TIMESTAMP, FLT_OPTION_TIMESTAMP); @@ -3369,13 +3372,14 @@ int32_t filterGetTimeRange(SNode *pNode, STimeWindow *win, bool *isStrict) { *win = TSWINDOW_INITIALIZER; } else { filterGetRangeNum(prev, &num); - if (num > 1) { - qError("only one time range accepted, num:%d", num); - FLT_ERR_JRET(TSDB_CODE_QRY_INVALID_TIME_CONDITION); - } FLT_CHK_JMP(num < 1); + if (num > 1) { + *isStrict = false; + qDebug("more than one time range, num:%d", num); + } + SFilterRange tra; filterGetRangeRes(prev, &tra); win->skey = tra.s; @@ -3401,6 +3405,30 @@ _return: } +int32_t filterGetTimeRange(SNode *pNode, STimeWindow *win, bool *isStrict) { + SFilterInfo *info = NULL; + int32_t code = 0; + + *isStrict = true; + + FLT_ERR_RET(filterInitFromNode(pNode, &info, FLT_OPTION_NO_REWRITE|FLT_OPTION_TIMESTAMP)); + + if (info->scalarMode) { + *win = TSWINDOW_INITIALIZER; + *isStrict = false; + goto _return; + } + + FLT_ERR_JRET(filterGetTimeRangeImpl(info, win, isStrict)); + +_return: + + filterFreeInfo(info); + + FLT_RET(code); +} + + int32_t filterConverNcharColumns(SFilterInfo* info, int32_t rows, bool *gotNchar) { if (FILTER_EMPTY_RES(info) || FILTER_ALL_RES(info)) { return TSDB_CODE_SUCCESS; diff --git a/source/libs/scalar/src/sclfunc.c b/source/libs/scalar/src/sclfunc.c index df9e1bf36d..7ce7b422ac 100644 --- a/source/libs/scalar/src/sclfunc.c +++ b/source/libs/scalar/src/sclfunc.c @@ -295,11 +295,11 @@ static int32_t doLengthFunction(SScalarParam *pInput, int32_t inputNum, SScalarP SColumnInfoData *pInputData = pInput->columnData; SColumnInfoData *pOutputData = pOutput->columnData; - char *in = pInputData->pData; + char *in = pInputData->pData + pInputData->varmeta.offset[0]; int16_t *out = (int16_t *)pOutputData->pData; for (int32_t i = 0; i < pInput->numOfRows; ++i) { - if (colDataIsNull_f(pInputData->nullbitmap, i)) { + if (colDataIsNull_s(pInputData, i)) { colDataSetNull_f(pOutputData->nullbitmap, i); continue; } @@ -357,7 +357,7 @@ int32_t concatFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOu } for (int32_t i = 0; i < inputNum; ++i) { pInputData[i] = pInput[i].columnData; - input[i] = pInputData[i]->pData; + input[i] = pInputData[i]->pData + pInputData[i]->varmeta.offset[0]; int32_t factor = 1; if (hasNcharCol && (GET_PARAM_TYPE(&pInput[i]) == TSDB_DATA_TYPE_VARCHAR)) { factor = TSDB_NCHAR_SIZE; @@ -438,7 +438,7 @@ int32_t concatWsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *p } for (int32_t i = 0; i < inputNum; ++i) { pInputData[i] = pInput[i].columnData; - input[i] = pInputData[i]->pData; + input[i] = pInputData[i]->pData + pInputData[i]->varmeta.offset[0]; int32_t factor = 1; if (hasNcharCol && (GET_PARAM_TYPE(&pInput[i]) == TSDB_DATA_TYPE_VARCHAR)) { factor = TSDB_NCHAR_SIZE; @@ -509,7 +509,7 @@ static int32_t doCaseConvFunction(SScalarParam *pInput, int32_t inputNum, SScala SColumnInfoData *pInputData = pInput->columnData; SColumnInfoData *pOutputData = pOutput->columnData; - char *input = pInputData->pData; + char *input = pInputData->pData + pInputData->varmeta.offset[0]; char *output = NULL; int32_t outputLen = pInputData->varmeta.length; @@ -554,7 +554,7 @@ static int32_t doTrimFunction(SScalarParam *pInput, int32_t inputNum, SScalarPar SColumnInfoData *pInputData = pInput->columnData; SColumnInfoData *pOutputData = pOutput->columnData; - char *input = pInputData->pData; + char *input = pInputData->pData + pInputData->varmeta.offset[0]; char *output = NULL; int32_t outputLen = pInputData->varmeta.length; @@ -606,7 +606,7 @@ int32_t substrFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOu SColumnInfoData *pInputData = pInput->columnData; SColumnInfoData *pOutputData = pOutput->columnData; - char *input = pInputData->pData; + char *input = pInputData->pData + pInputData->varmeta.offset[0]; char *output = NULL; int32_t outputLen = pInputData->varmeta.length * pInput->numOfRows; diff --git a/source/libs/scalar/test/filter/filterTests.cpp b/source/libs/scalar/test/filter/filterTests.cpp index 1e6978d889..26ef5dbd44 100644 --- a/source/libs/scalar/test/filter/filterTests.cpp +++ b/source/libs/scalar/test/filter/filterTests.cpp @@ -241,6 +241,7 @@ TEST(timerangeTest, greater) { bool isStrict = false; int32_t code = filterGetTimeRange(opNode1, &win, &isStrict); ASSERT_EQ(code, 0); + ASSERT_EQ(isStrict, true); ASSERT_EQ(win.skey, tsmall); ASSERT_EQ(win.ekey, INT64_MAX); //filterFreeInfo(filter); @@ -270,6 +271,7 @@ TEST(timerangeTest, greater_and_lower) { STimeWindow win = {0}; bool isStrict = false; int32_t code = filterGetTimeRange(logicNode, &win, &isStrict); + ASSERT_EQ(isStrict, true); ASSERT_EQ(code, 0); ASSERT_EQ(win.skey, tsmall); ASSERT_EQ(win.ekey, tbig); @@ -277,6 +279,56 @@ TEST(timerangeTest, greater_and_lower) { nodesDestroyNode(logicNode); } +TEST(timerangeTest, greater_and_lower_not_strict) { + SNode *pcol = NULL, *pval = NULL, *opNode1 = NULL, *opNode2 = NULL, *logicNode1 = NULL, *logicNode2 = NULL; + bool eRes[5] = {false, false, true, true, true}; + SScalarParam res = {0}; + int64_t tsmall1 = 222, tbig1 = 333; + int64_t tsmall2 = 444, tbig2 = 555; + SNode *list[2] = {0}; + + flttMakeColumnNode(&pcol, NULL, TSDB_DATA_TYPE_TIMESTAMP, sizeof(int64_t), 0, NULL); + flttMakeValueNode(&pval, TSDB_DATA_TYPE_TIMESTAMP, &tsmall1); + flttMakeOpNode(&opNode1, OP_TYPE_GREATER_THAN, TSDB_DATA_TYPE_BOOL, pcol, pval); + flttMakeColumnNode(&pcol, NULL, TSDB_DATA_TYPE_TIMESTAMP, sizeof(int64_t), 0, NULL); + flttMakeValueNode(&pval, TSDB_DATA_TYPE_TIMESTAMP, &tbig1); + flttMakeOpNode(&opNode2, OP_TYPE_LOWER_THAN, TSDB_DATA_TYPE_BOOL, pcol, pval); + list[0] = opNode1; + list[1] = opNode2; + + flttMakeLogicNode(&logicNode1, LOGIC_COND_TYPE_AND, list, 2); + + flttMakeColumnNode(&pcol, NULL, TSDB_DATA_TYPE_TIMESTAMP, sizeof(int64_t), 0, NULL); + flttMakeValueNode(&pval, TSDB_DATA_TYPE_TIMESTAMP, &tsmall2); + flttMakeOpNode(&opNode1, OP_TYPE_GREATER_THAN, TSDB_DATA_TYPE_BOOL, pcol, pval); + flttMakeColumnNode(&pcol, NULL, TSDB_DATA_TYPE_TIMESTAMP, sizeof(int64_t), 0, NULL); + flttMakeValueNode(&pval, TSDB_DATA_TYPE_TIMESTAMP, &tbig2); + flttMakeOpNode(&opNode2, OP_TYPE_LOWER_THAN, TSDB_DATA_TYPE_BOOL, pcol, pval); + list[0] = opNode1; + list[1] = opNode2; + + flttMakeLogicNode(&logicNode2, LOGIC_COND_TYPE_AND, list, 2); + + list[0] = logicNode1; + list[1] = logicNode2; + flttMakeLogicNode(&logicNode1, LOGIC_COND_TYPE_OR, list, 2); + + //SFilterInfo *filter = NULL; + //int32_t code = filterInitFromNode(logicNode, &filter, FLT_OPTION_NO_REWRITE|FLT_OPTION_TIMESTAMP); + //ASSERT_EQ(code, 0); + STimeWindow win = {0}; + bool isStrict = false; + int32_t code = filterGetTimeRange(logicNode1, &win, &isStrict); + ASSERT_EQ(isStrict, false); + ASSERT_EQ(code, 0); + ASSERT_EQ(win.skey, tsmall1); + ASSERT_EQ(win.ekey, tbig2); + //filterFreeInfo(filter); + nodesDestroyNode(logicNode1); +} + + + TEST(columnTest, smallint_column_greater_double_value) { SNode *pLeft = NULL, *pRight = NULL, *opNode = NULL; int16_t leftv[5]= {1, 2, 3, 4, 5}; diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 1675f91330..5fb6069eff 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -1235,6 +1235,34 @@ _return: SCH_RET(schProcessOnTaskFailure(pJob, pTask, code)); } +int32_t schGetTaskFromTaskList(SHashObj *pTaskList, uint64_t taskId, SSchTask **pTask) { + int32_t s = taosHashGetSize(pTaskList); + if (s <= 0) { + return TSDB_CODE_SUCCESS; + } + + SSchTask **task = taosHashGet(pTaskList, &taskId, sizeof(taskId)); + if (NULL == task || NULL == (*task)) { + return TSDB_CODE_SUCCESS; + } + + *pTask = *task; + + return TSDB_CODE_SUCCESS; +} + +int32_t schUpdateTaskExecNodeHandle(SSchTask *pTask, void *handle, int32_t rspCode) { + if (rspCode || NULL == pTask->execNodes || taosArrayGetSize(pTask->execNodes) > 1 || taosArrayGetSize(pTask->execNodes) <= 0) { + return TSDB_CODE_SUCCESS; + } + + SSchNodeInfo *nodeInfo = taosArrayGet(pTask->execNodes, 0); + nodeInfo->handle = handle; + + return TSDB_CODE_SUCCESS; +} + + int32_t schHandleCallback(void *param, const SDataBuf *pMsg, int32_t msgType, int32_t rspCode) { int32_t code = 0; SSchTaskCallbackParam *pParam = (SSchTaskCallbackParam *)param; @@ -1247,22 +1275,25 @@ int32_t schHandleCallback(void *param, const SDataBuf *pMsg, int32_t msgType, in SCH_ERR_JRET(TSDB_CODE_QRY_JOB_FREED); } - int32_t s = taosHashGetSize(pJob->execTasks); - if (s <= 0) { - SCH_JOB_ELOG("empty execTask list, refId:%" PRIx64 ", taskId:%" PRIx64, pParam->refId, pParam->taskId); + schGetTaskFromTaskList(pJob->execTasks, pParam->taskId, &pTask); + if (NULL == pTask) { + if (TDMT_VND_EXPLAIN_RSP == msgType) { + schGetTaskFromTaskList(pJob->succTasks, pParam->taskId, &pTask); + } else { + SCH_JOB_ELOG("task not found in execTask list, refId:%" PRIx64 ", taskId:%" PRIx64, pParam->refId, pParam->taskId); + SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR); + } + } + + if (NULL == pTask) { + SCH_JOB_ELOG("task not found in execList & succList, refId:%" PRIx64 ", taskId:%" PRIx64, pParam->refId, pParam->taskId); SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR); } - SSchTask **task = taosHashGet(pJob->execTasks, &pParam->taskId, sizeof(pParam->taskId)); - if (NULL == task || NULL == (*task)) { - SCH_JOB_ELOG("task not found in execTask list, refId:%" PRIx64 ", taskId:%" PRIx64, pParam->refId, pParam->taskId); - SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR); - } - - pTask = *task; SCH_TASK_DLOG("rsp msg received, type:%s, handle:%p, code:%s", TMSG_INFO(msgType), pMsg->handle, tstrerror(rspCode)); SCH_SET_TASK_HANDLE(pTask, pMsg->handle); + schUpdateTaskExecNodeHandle(pTask, pMsg->handle, rspCode); SCH_ERR_JRET(schHandleResponseMsg(pJob, pTask, msgType, pMsg->pData, pMsg->len, rspCode)); _return: diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index d9c288a39b..0ab2f4e010 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -130,9 +130,9 @@ static SCliThrdObj* createThrdObj(); static void destroyThrdObj(SCliThrdObj* pThrd); #define CONN_HOST_THREAD_INDEX(conn) (conn ? ((SCliConn*)conn)->hThrdIdx : -1) -#define CONN_PERSIST_TIME(para) (para * 1000 * 10) -#define CONN_GET_HOST_THREAD(conn) (conn ? ((SCliConn*)conn)->hostThrd : NULL) -#define CONN_GET_INST_LABEL(conn) (((STrans*)(((SCliThrdObj*)(conn)->hostThrd)->pTransInst))->label) +#define CONN_PERSIST_TIME(para) (para * 1000 * 10) +#define CONN_GET_HOST_THREAD(conn) (conn ? ((SCliConn*)conn)->hostThrd : NULL) +#define CONN_GET_INST_LABEL(conn) (((STrans*)(((SCliThrdObj*)(conn)->hostThrd)->pTransInst))->label) #define CONN_SHOULD_RELEASE(conn, head) \ do { \ if ((head)->release == 1 && (head->msgLen) == sizeof(*head)) { \ @@ -154,20 +154,20 @@ static void destroyThrdObj(SCliThrdObj* pThrd); } \ } while (0) -#define CONN_GET_MSGCTX_BY_AHANDLE(conn, ahandle) \ - do { \ - int i = 0, sz = transQueueSize(&conn->cliMsgs); \ - for (; i < sz; i++) { \ - pMsg = transQueueGet(&conn->cliMsgs, i); \ +#define CONN_GET_MSGCTX_BY_AHANDLE(conn, ahandle) \ + do { \ + int i = 0, sz = transQueueSize(&conn->cliMsgs); \ + for (; i < sz; i++) { \ + pMsg = transQueueGet(&conn->cliMsgs, i); \ if (pMsg != NULL && pMsg->ctx != NULL && (uint64_t)pMsg->ctx->ahandle == ahandle) { \ - break; \ - } \ - } \ - if (i == sz) { \ - pMsg = NULL; \ - } else { \ - pMsg = transQueueRm(&conn->cliMsgs, i); \ - } \ + break; \ + } \ + } \ + if (i == sz) { \ + pMsg = NULL; \ + } else { \ + pMsg = transQueueRm(&conn->cliMsgs, i); \ + } \ } while (0) #define CONN_GET_NEXT_SENDMSG(conn) \ do { \ @@ -205,12 +205,12 @@ static void destroyThrdObj(SCliThrdObj* pThrd); transRefCliHandle(conn); \ } \ } while (0) -#define CONN_NO_PERSIST_BY_APP(conn) \ - (((conn)->status == ConnNormal || (conn)->status == ConnInPool) && T_REF_VAL_GET(conn) == 1) -#define CONN_RELEASE_BY_SERVER(conn) \ - (((conn)->status == ConnRelease || (conn)->status == ConnInPool) && T_REF_VAL_GET(conn) == 1) -#define REQUEST_NO_RESP(msg) ((msg)->noResp == 1) -#define REQUEST_PERSIS_HANDLE(msg) ((msg)->persistHandle == 1) + +#define CONN_NO_PERSIST_BY_APP(conn) (((conn)->status == ConnNormal || (conn)->status == ConnInPool) && T_REF_VAL_GET(conn) == 1) +#define CONN_RELEASE_BY_SERVER(conn) (((conn)->status == ConnRelease || (conn)->status == ConnInPool) && T_REF_VAL_GET(conn) == 1) + +#define REQUEST_NO_RESP(msg) ((msg)->noResp == 1) +#define REQUEST_PERSIS_HANDLE(msg) ((msg)->persistHandle == 1) #define REQUEST_RELEASE_HANDLE(cmsg) ((cmsg)->type == Release) static void* cliWorkThread(void* arg); @@ -281,9 +281,8 @@ void cliHandleResp(SCliConn* conn) { tDebug("%s cli conn %p ref by app", CONN_GET_INST_LABEL(conn), conn); } - tDebug("%s cli conn %p %s received from %s:%d, local info: %s:%d, msg size: %d", pTransInst->label, conn, - TMSG_INFO(pHead->msgType), taosInetNtoa(conn->addr.sin_addr), ntohs(conn->addr.sin_port), - taosInetNtoa(conn->locaddr.sin_addr), ntohs(conn->locaddr.sin_port), transMsg.contLen); + tDebug("%s cli conn %p %s received from %s:%d, local info: %s:%d, msg size: %d", pTransInst->label, conn, TMSG_INFO(pHead->msgType), + taosInetNtoa(conn->addr.sin_addr), ntohs(conn->addr.sin_port), taosInetNtoa(conn->locaddr.sin_addr), ntohs(conn->locaddr.sin_port), transMsg.contLen); conn->secured = pHead->secured; @@ -349,12 +348,10 @@ void cliHandleExcept(SCliConn* pConn) { if (pMsg == NULL && !CONN_NO_PERSIST_BY_APP(pConn)) { transMsg.ahandle = transCtxDumpVal(&pConn->ctx, transMsg.msgType); - tDebug("%s cli conn %p construct ahandle %p by %s", CONN_GET_INST_LABEL(pConn), pConn, transMsg.ahandle, - TMSG_INFO(transMsg.msgType)); + tDebug("%s cli conn %p construct ahandle %p by %s", CONN_GET_INST_LABEL(pConn), pConn, transMsg.ahandle, TMSG_INFO(transMsg.msgType)); if (transMsg.ahandle == NULL) { transMsg.ahandle = transCtxDumpBrokenlinkVal(&pConn->ctx, (int32_t*)&(transMsg.msgType)); - tDebug("%s cli conn %p construct ahandle %p due to brokenlink", CONN_GET_INST_LABEL(pConn), pConn, - transMsg.ahandle); + tDebug("%s cli conn %p construct ahandle %p due to brokenlink", CONN_GET_INST_LABEL(pConn), pConn, transMsg.ahandle); } } else { transMsg.ahandle = pCtx ? pCtx->ahandle : NULL; @@ -628,9 +625,8 @@ void cliSend(SCliConn* pConn) { pHead->release = REQUEST_RELEASE_HANDLE(pCliMsg) ? 1 : 0; uv_buf_t wb = uv_buf_init((char*)pHead, msgLen); - tDebug("%s cli conn %p %s is send to %s:%d, local info %s:%d", CONN_GET_INST_LABEL(pConn), pConn, - TMSG_INFO(pHead->msgType), taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), - taosInetNtoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port)); + tDebug("%s cli conn %p %s is send to %s:%d, local info %s:%d", CONN_GET_INST_LABEL(pConn), pConn, TMSG_INFO(pHead->msgType), + taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), taosInetNtoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port)); if (pHead->persist == 1) { CONN_SET_PERSIST_BY_APP(pConn); @@ -722,10 +718,11 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { conn->hThrdIdx = pCtx->hThrdIdx; transCtxMerge(&conn->ctx, &pCtx->appCtx); - if (!transQueuePush(&conn->cliMsgs, pMsg)) { - return; - } - transDestroyBuffer(&conn->readBuf); + transQueuePush(&conn->cliMsgs, pMsg); + // tTrace("%s cli conn %p queue msg size %d", ((STrans*)pThrd->pTransInst)->label, conn, 2); + // return; + //} + // transDestroyBuffer(&conn->readBuf); cliSend(conn); } else { conn = cliCreateConn(pThrd); @@ -740,8 +737,13 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { if (ret) { tError("%s cli conn %p failed to set conn option, errmsg %s", pTransInst->label, conn, uv_err_name(ret)); } + struct sockaddr_in addr; - uv_ip4_addr(pMsg->ctx->ip, pMsg->ctx->port, &addr); + + addr.sin_family = AF_INET; + addr.sin_addr.s_addr = taosGetIpv4FromFqdn(conn->ip); + addr.sin_port = (uint16_t)htons((uint16_t)conn->port); + // uv_ip4_addr(pMsg->ctx->ip, pMsg->ctx->port, &addr); // handle error in callback if fail to connect tTrace("%s cli conn %p try to connect to %s:%d", pTransInst->label, conn, pMsg->ctx->ip, pMsg->ctx->port); uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, cliConnCb); @@ -943,8 +945,8 @@ void transSendRequest(void* shandle, const char* ip, uint32_t port, STransMsg* p SCliThrdObj* thrd = ((SCliObj*)pTransInst->tcphandle)->pThreadObj[index]; - tDebug("send request at thread:%d %p, dst: %s:%d", index, pMsg, ip, port); - transSendAsync(thrd->asyncPool, &(cliMsg->q)); + tDebug("send request at thread:%d %p, dst: %s:%d, app:%p", index, pMsg, ip, port, pMsg->ahandle); + ASSERT(transSendAsync(thrd->asyncPool, &(cliMsg->q)) == 0); } void transSendRecv(void* shandle, const char* ip, uint32_t port, STransMsg* pReq, STransMsg* pRsp) { diff --git a/source/util/src/tpagedbuf.c b/source/util/src/tpagedbuf.c index d834263b94..84a2efb46c 100644 --- a/source/util/src/tpagedbuf.c +++ b/source/util/src/tpagedbuf.c @@ -48,10 +48,8 @@ struct SDiskbasedBuf { }; static int32_t createDiskFile(SDiskbasedBuf* pBuf) { - // pBuf->file = fopen(pBuf->path, "wb+"); - pBuf->pFile = taosOpenFile(pBuf->path, TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC); + pBuf->pFile = taosOpenFile(pBuf->path, TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC | TD_FILE_AUTO_DEL); if (pBuf->pFile == NULL) { - // qError("failed to create tmp file: %s on disk. %s", pBuf->path, strerror(errno)); return TAOS_SYSTEM_ERROR(errno); } diff --git a/tests/script/tsim/query/charScalarFunction.sim b/tests/script/tsim/query/charScalarFunction.sim index d7b1139b5b..13108fecb7 100644 --- a/tests/script/tsim/query/charScalarFunction.sim +++ b/tests/script/tsim/query/charScalarFunction.sim @@ -286,25 +286,25 @@ endi #sql_error select c1, ltrim(t1), c2, ltrim(t2) from ctb3 -#print ====> rtrim -#sql select c1, rtrim(c1), c2, rtrim(c2), rtrim(" abcdEFGH =-*&% ") from ntb3 -#print ====> select c1, rtrim(c1), c2, rtrim(c2), rtrim(" abcdEFGH =-*&% ") from ctb3 -#sql select c1, rtrim(c1), c2, rtrim(c2), rtrim(" abcdEFGH =-*&% ") from ctb3 -#print ====> rows: $rows -#print ====> [ $data00 ] [ $data01 ] [ $data02 ] [ $data03 ] [ $data04 ] [ $data05 ] [ $data06 ] -#print ====> $data10 $data11 $data12 $data13 $data14 $data15 -#if $rows != 1 then -# return -1 -#endi -#if $data01 != @ abcd 1234@ then -# return -1 -#endi -#if $data03 != @ abcd 1234@ then -# return -1 -#endi -#if $data04 != @ abcdEFGH =-*&%@ then -# return -1 -#endi +print ====> rtrim +sql select c1, rtrim(c1), c2, rtrim(c2), rtrim(" abcdEFGH =-*&% ") from ntb3 +print ====> select c1, rtrim(c1), c2, rtrim(c2), rtrim(" abcdEFGH =-*&% ") from ctb3 +sql select c1, rtrim(c1), c2, rtrim(c2), rtrim(" abcdEFGH =-*&% ") from ctb3 +print ====> rows: $rows +print ====> [ $data00 ] [ $data01 ] [ $data02 ] [ $data03 ] [ $data04 ] [ $data05 ] [ $data06 ] +print ====> $data10 $data11 $data12 $data13 $data14 $data15 +if $rows != 1 then + return -1 +endi +if $data01 != @ abcd 1234@ then + return -1 +endi +if $data03 != @ abcd 1234@ then + return -1 +endi +if $data04 != @ abcdEFGH =-*&%@ then + return -1 +endi #sql_error select c1, rtrim(t1), c2, rtrim(t2) from ctb3 diff --git a/tests/test/c/tmqSim.c b/tests/test/c/tmqSim.c index 22d8c2b735..7372745cb8 100644 --- a/tests/test/c/tmqSim.c +++ b/tests/test/c/tmqSim.c @@ -274,17 +274,11 @@ int main(int32_t argc, char *argv[]) { loop_consume(tmq); - err = tmq_unsubscribe(tmq); - ASSERT(err == TMQ_RESP_ERR__SUCCESS); - - - #if 0 err = tmq_unsubscribe(tmq); if (err) { printf("tmq_unsubscribe() fail, reason: %s\n", tmq_err2str(err)); exit(-1); } - #endif return 0; }