From 9ef605faa00f679a88d16142b2c83ee81f3cee39 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 22 Jan 2022 14:24:09 +0800 Subject: [PATCH 1/4] [td-11818]fix memory leak. --- source/dnode/vnode/src/vnd/vnodeQuery.c | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/source/dnode/vnode/src/vnd/vnodeQuery.c b/source/dnode/vnode/src/vnd/vnodeQuery.c index 9390e19523..4012914723 100644 --- a/source/dnode/vnode/src/vnd/vnodeQuery.c +++ b/source/dnode/vnode/src/vnd/vnodeQuery.c @@ -156,6 +156,11 @@ _exit: return 0; } +static void freeItemHelper(void* pItem) { + char* p = *(char**)pItem; + free(p); +} + /** * @param pVnode * @param pMsg @@ -169,17 +174,20 @@ static int32_t vnodeGetTableList(SVnode *pVnode, SRpcMsg *pMsg) { int32_t totalLen = 0; int32_t numOfTables = 0; while ((name = metaTbCursorNext(pCur)) != NULL) { - if (numOfTables < 1000) { // TODO: temp get tables of vnode, and should del when show tables commad ok. + if (numOfTables < 10000) { // TODO: temp get tables of vnode, and should del when show tables commad ok. taosArrayPush(pArray, &name); totalLen += strlen(name); + } else { + tfree(name); } + numOfTables++; } // TODO: temp debug, and should del when show tables command ok - vError("====vgId:%d, numOfTables: %d", pVnode->vgId, numOfTables); - if (numOfTables > 1000) { - numOfTables = 1000; + vInfo("====vgId:%d, numOfTables: %d", pVnode->vgId, numOfTables); + if (numOfTables > 10000) { + numOfTables = 10000; } metaCloseTbCursor(pCur); @@ -214,6 +222,6 @@ static int32_t vnodeGetTableList(SVnode *pVnode, SRpcMsg *pMsg) { }; rpcSendResponse(&rpcMsg); - taosArrayDestroy(pArray); + taosArrayDestroyEx(pArray, freeItemHelper); return 0; } From eb712702b9b747ffc65a5481f19d4120a31b7e19 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 22 Jan 2022 15:42:37 +0800 Subject: [PATCH 2/4] [td-11818]fix bug in select * from super_table. --- source/libs/executor/inc/executorimpl.h | 5 +- source/libs/executor/src/executorimpl.c | 151 +++++++++++++++--------- 2 files changed, 98 insertions(+), 58 deletions(-) diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 73a30a62f5..ebf3d83a1a 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -372,11 +372,14 @@ typedef struct STaskParam { typedef struct SExchangeInfo { SArray *pSources; - uint64_t bytes; // total load bytes from remote tsem_t ready; void *pTransporter; SRetrieveTableRsp *pRsp; SSDataBlock *pResult; + int32_t current; + uint64_t rowsOfCurrentSource; + uint64_t bytes; // total load bytes from remote + uint64_t totalRows; } SExchangeInfo; typedef struct STableScanInfo { diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index e963c49c86..02827ec32c 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -5144,72 +5144,110 @@ static SSDataBlock* doLoadRemoteData(void* param, bool* newgroup) { SExecTaskInfo *pTaskInfo = pOperator->pTaskInfo; *newgroup = false; - if (pExchangeInfo->pRsp != NULL && pExchangeInfo->pRsp->completed == 1) { + + size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources); + if (pExchangeInfo->current >= totalSources) { return NULL; } - SResFetchReq *pMsg = calloc(1, sizeof(SResFetchReq)); - if (NULL == pMsg) { // todo handle malloc error - pTaskInfo->code = TSDB_CODE_QRY_OUT_OF_MEMORY; - goto _error; - } + SResFetchReq* pMsg = NULL; + SMsgSendInfo* pMsgSendInfo = NULL; - SDownstreamSource* pSource = taosArrayGet(pExchangeInfo->pSources, 0); - SEpSet epSet = {0}; - - epSet.numOfEps = pSource->addr.numOfEps; - epSet.port[0] = pSource->addr.epAddr[0].port; - tstrncpy(epSet.fqdn[0], pSource->addr.epAddr[0].fqdn, tListLen(epSet.fqdn[0])); - - pMsg->header.vgId = htonl(pSource->addr.nodeId); - pMsg->sId = htobe64(pSource->schedId); - pMsg->taskId = htobe64(pSource->taskId); - pMsg->queryId = htobe64(pTaskInfo->id.queryId); - - // send the fetch remote task result reques - SMsgSendInfo* pMsgSendInfo = calloc(1, sizeof(SMsgSendInfo)); - if (NULL == pMsgSendInfo) { - qError("QID:%"PRIx64" calloc %d failed", GET_TASKID(pTaskInfo), (int32_t)sizeof(SMsgSendInfo)); - pTaskInfo->code = TSDB_CODE_QRY_OUT_OF_MEMORY; - goto _error; - } - - pMsgSendInfo->param = pExchangeInfo; - pMsgSendInfo->msgInfo.pData = pMsg; - pMsgSendInfo->msgInfo.len = sizeof(SResFetchReq); - pMsgSendInfo->msgType = TDMT_VND_FETCH; - pMsgSendInfo->fp = loadRemoteDataCallback; - - int64_t transporterId = 0; - int32_t code = asyncSendMsgToServer(pExchangeInfo->pTransporter, &epSet, &transporterId, pMsgSendInfo); - tsem_wait(&pExchangeInfo->ready); - - if (pExchangeInfo->pRsp->numOfRows == 0) { - return NULL; - } - - SSDataBlock* pRes = pExchangeInfo->pResult; - char* pData = pExchangeInfo->pRsp->data; - - for(int32_t i = 0; i < pOperator->numOfOutput; ++i) { - SColumnInfoData* pColInfoData = taosArrayGet(pRes->pDataBlock, i); - char* tmp = realloc(pColInfoData->pData, pColInfoData->info.bytes * pExchangeInfo->pRsp->numOfRows); - if (tmp == NULL) { + while(1) { + pMsg = calloc(1, sizeof(SResFetchReq)); + if (NULL == pMsg) { // todo handle malloc error + pTaskInfo->code = TSDB_CODE_QRY_OUT_OF_MEMORY; goto _error; } - size_t len = pExchangeInfo->pRsp->numOfRows * pColInfoData->info.bytes; - memcpy(tmp, pData, len); + SDownstreamSource* pSource = taosArrayGet(pExchangeInfo->pSources, pExchangeInfo->current); + SEpSet epSet = {0}; - pColInfoData->pData = tmp; - pData += len; + epSet.numOfEps = pSource->addr.numOfEps; + epSet.port[0] = pSource->addr.epAddr[0].port; + tstrncpy(epSet.fqdn[0], pSource->addr.epAddr[0].fqdn, tListLen(epSet.fqdn[0])); + + qDebug("QID:0x%" PRIx64 " build fetch msg and send to vgId:%d, ep:%s, taskId:0x%" PRIx64 ", %d/%" PRIzu, + GET_TASKID(pTaskInfo), pSource->addr.nodeId, epSet.fqdn[0], pSource->taskId, pExchangeInfo->current, totalSources); + + pMsg->header.vgId = htonl(pSource->addr.nodeId); + pMsg->sId = htobe64(pSource->schedId); + pMsg->taskId = htobe64(pSource->taskId); + pMsg->queryId = htobe64(pTaskInfo->id.queryId); + + // send the fetch remote task result reques + pMsgSendInfo = calloc(1, sizeof(SMsgSendInfo)); + if (NULL == pMsgSendInfo) { + qError("QID:0x%" PRIx64 " prepare message %d failed", GET_TASKID(pTaskInfo), (int32_t)sizeof(SMsgSendInfo)); + pTaskInfo->code = TSDB_CODE_QRY_OUT_OF_MEMORY; + goto _error; + } + + pMsgSendInfo->param = pExchangeInfo; + pMsgSendInfo->msgInfo.pData = pMsg; + pMsgSendInfo->msgInfo.len = sizeof(SResFetchReq); + pMsgSendInfo->msgType = TDMT_VND_FETCH; + pMsgSendInfo->fp = loadRemoteDataCallback; + + int64_t transporterId = 0; + int32_t code = asyncSendMsgToServer(pExchangeInfo->pTransporter, &epSet, &transporterId, pMsgSendInfo); + tsem_wait(&pExchangeInfo->ready); + + SRetrieveTableRsp* pRsp = pExchangeInfo->pRsp; + if (pRsp->numOfRows == 0) { + if (pExchangeInfo->current >= taosArrayGetSize(pExchangeInfo->pSources)) { + return NULL; + } + + qDebug("QID:0x%"PRIx64" vgId:%d, taskID:0x%"PRIx64" %d of total completed, rowsOfSource:%"PRIu64", totalRows:%"PRIu64" try next", + GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pExchangeInfo->current + 1, + pExchangeInfo->rowsOfCurrentSource, pExchangeInfo->totalRows); + + pExchangeInfo->rowsOfCurrentSource = 0; + pExchangeInfo->current += 1; + continue; + } + + SSDataBlock* pRes = pExchangeInfo->pResult; + char* pData = pRsp->data; + + for (int32_t i = 0; i < pOperator->numOfOutput; ++i) { + SColumnInfoData* pColInfoData = taosArrayGet(pRes->pDataBlock, i); + char* tmp = realloc(pColInfoData->pData, pColInfoData->info.bytes * pRsp->numOfRows); + if (tmp == NULL) { + goto _error; + } + + size_t len = pRsp->numOfRows * pColInfoData->info.bytes; + memcpy(tmp, pData, len); + + pColInfoData->pData = tmp; + pData += len; + } + + pRes->info.numOfCols = pOperator->numOfOutput; + pRes->info.rows = pRsp->numOfRows; + + pExchangeInfo->totalRows += pRsp->numOfRows; + pExchangeInfo->bytes += pRsp->compLen; + pExchangeInfo->rowsOfCurrentSource += pRsp->numOfRows; + + if (pRsp->completed == 1) { + qDebug("QID:0x%" PRIx64 " fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " numOfRows:%d, rowsOfSource:%" PRIu64 + ", totalRows:%" PRIu64 ", totalBytes:%" PRIu64 " try next %d/%" PRIzu, + GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pRes->info.rows, pExchangeInfo->rowsOfCurrentSource, pExchangeInfo->totalRows, pExchangeInfo->bytes, + pExchangeInfo->current + 1, totalSources); + + pExchangeInfo->rowsOfCurrentSource = 0; + pExchangeInfo->current += 1; + } else { + qDebug("QID:0x%" PRIx64 " fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " numOfRows:%d, totalRows:%" PRIu64 ", totalBytes:%" PRIu64, + GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pRes->info.rows, pExchangeInfo->totalRows, pExchangeInfo->bytes); + } + + return pExchangeInfo->pResult; } - pRes->info.numOfCols = pOperator->numOfOutput; - pRes->info.rows = pExchangeInfo->pRsp->numOfRows; - - return pExchangeInfo->pResult; - _error: tfree(pMsg); tfree(pMsgSendInfo); @@ -7719,7 +7757,6 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhyNode* pPhyNode, SExecTaskInfo* pTask SExchangePhyNode* pEx = (SExchangePhyNode*) pPhyNode; return createExchangeOperatorInfo(pEx->pSrcEndPoints, pEx->node.pTargets, pTaskInfo); } else if (pPhyNode->info.type == OP_StreamScan) { - size_t numOfCols = taosArrayGetSize(pPhyNode->pTargets); SScanPhyNode* pScanPhyNode = (SScanPhyNode*)pPhyNode; // simple child table. return createStreamScanOperatorInfo(readerHandle, pPhyNode->pTargets, pScanPhyNode->uid, pTaskInfo); } From f5916bcf478fe841acc68bcfa58bfb6008d542c0 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 22 Jan 2022 22:51:48 +0800 Subject: [PATCH 3/4] [td-11818] fix bug in query. --- source/libs/executor/src/executorMain.c | 7 ------- source/libs/executor/src/executorimpl.c | 20 +++++++++++++------- 2 files changed, 13 insertions(+), 14 deletions(-) diff --git a/source/libs/executor/src/executorMain.c b/source/libs/executor/src/executorMain.c index c5b57ee1a5..e5d56aca15 100644 --- a/source/libs/executor/src/executorMain.c +++ b/source/libs/executor/src/executorMain.c @@ -164,13 +164,6 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) { return TSDB_CODE_SUCCESS; } - // STaskRuntimeEnv* pRuntimeEnv = &pTaskInfo->runtimeEnv; - // if (pTaskInfo->tableqinfoGroupInfo.numOfTables == 0) { - // qDebug("QInfo:0x%"PRIx64" no table exists for query, abort", GET_TASKID(pTaskInfo)); - // setTaskStatus(pTaskInfo, TASK_COMPLETED); - // return doBuildResCheck(pTaskInfo); - // } - // error occurs, record the error code and return to client int32_t ret = setjmp(pTaskInfo->env); if (ret != TSDB_CODE_SUCCESS) { diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 02827ec32c..4ce652f5fd 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -4958,6 +4958,10 @@ static SSDataBlock* doTableScan(void* param, bool *newgroup) { STableScanInfo *pTableScanInfo = pOperator->info; SExecTaskInfo *pTaskInfo = pOperator->pTaskInfo; + if (pTableScanInfo->pTsdbReadHandle == NULL) { + return NULL; + } + SResultRowInfo* pResultRowInfo = pTableScanInfo->pResultRowInfo; *newgroup = false; @@ -5161,8 +5165,8 @@ static SSDataBlock* doLoadRemoteData(void* param, bool* newgroup) { } SDownstreamSource* pSource = taosArrayGet(pExchangeInfo->pSources, pExchangeInfo->current); - SEpSet epSet = {0}; + SEpSet epSet = {0}; epSet.numOfEps = pSource->addr.numOfEps; epSet.port[0] = pSource->addr.epAddr[0].port; tstrncpy(epSet.fqdn[0], pSource->addr.epAddr[0].fqdn, tListLen(epSet.fqdn[0])); @@ -5195,17 +5199,18 @@ static SSDataBlock* doLoadRemoteData(void* param, bool* newgroup) { SRetrieveTableRsp* pRsp = pExchangeInfo->pRsp; if (pRsp->numOfRows == 0) { - if (pExchangeInfo->current >= taosArrayGetSize(pExchangeInfo->pSources)) { - return NULL; - } - qDebug("QID:0x%"PRIx64" vgId:%d, taskID:0x%"PRIx64" %d of total completed, rowsOfSource:%"PRIu64", totalRows:%"PRIu64" try next", GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pExchangeInfo->current + 1, pExchangeInfo->rowsOfCurrentSource, pExchangeInfo->totalRows); pExchangeInfo->rowsOfCurrentSource = 0; pExchangeInfo->current += 1; - continue; + + if (pExchangeInfo->current >= totalSources) { + return NULL; + } else { + continue; + } } SSDataBlock* pRes = pExchangeInfo->pResult; @@ -7823,11 +7828,12 @@ static tsdbReadHandleT doCreateDataReadHandle(STableScanPhyNode* pTableScanNode, if (groupInfo.numOfTables == 0) { code = 0; - // qDebug("no table qualified for query, reqId:0x%"PRIx64, (*pTask)->id.queryId); + qDebug("no table qualified for query, reqId:0x%"PRIx64, queryId); goto _error; } return createDataReadHandle(pTableScanNode, &groupInfo, readerHandle, queryId); + _error: terrno = code; return NULL; From cf4b5594d43e45813388d253bf0271950a9f6c59 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 22 Jan 2022 22:59:19 +0800 Subject: [PATCH 4/4] [td-11818] fix crash in show. --- source/client/test/clientTests.cpp | 104 ++++++++++++------------- source/libs/parser/src/dCDAstProcess.c | 8 +- 2 files changed, 58 insertions(+), 54 deletions(-) diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp index 854f18bd79..ee23567705 100644 --- a/source/client/test/clientTests.cpp +++ b/source/client/test/clientTests.cpp @@ -567,58 +567,58 @@ TEST(testCase, show_table_Test) { // taos_free_result(pRes); // taos_close(pConn); //} -// -//TEST(testCase, projection_query_tables) { -// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); -// ASSERT_NE(pConn, nullptr); -// -// TAOS_RES* pRes = taos_query(pConn, "use abc1"); -// taos_free_result(pRes); -// -//// pRes = taos_query(pConn, "create stable st1 (ts timestamp, k int) tags(a int)"); -//// if (taos_errno(pRes) != 0) { -//// printf("failed to create table tu, reason:%s\n", taos_errstr(pRes)); -//// } -//// taos_free_result(pRes); -//// -//// pRes = taos_query(pConn, "create table tu using st1 tags(1)"); -//// if (taos_errno(pRes) != 0) { -//// printf("failed to create table tu, reason:%s\n", taos_errstr(pRes)); -//// } -//// taos_free_result(pRes); -//// -//// for(int32_t i = 0; i < 100; ++i) { -//// char sql[512] = {0}; -//// sprintf(sql, "insert into tu values(now+%da, %d)", i, i); -//// TAOS_RES* p = taos_query(pConn, sql); -//// if (taos_errno(p) != 0) { -//// printf("failed to insert data, reason:%s\n", taos_errstr(p)); -//// } -//// -//// taos_free_result(p); -//// } -// -// pRes = taos_query(pConn, "select * from tu"); -// if (taos_errno(pRes) != 0) { -// printf("failed to select from table, reason:%s\n", taos_errstr(pRes)); -// taos_free_result(pRes); -// ASSERT_TRUE(false); -// } -// -// TAOS_ROW pRow = NULL; -// TAOS_FIELD* pFields = taos_fetch_fields(pRes); -// int32_t numOfFields = taos_num_fields(pRes); -// -// char str[512] = {0}; -// while ((pRow = taos_fetch_row(pRes)) != NULL) { -// int32_t code = taos_print_row(str, pRow, pFields, numOfFields); -// printf("%s\n", str); -// } -// -// taos_free_result(pRes); -// taos_close(pConn); -//} -// + +TEST(testCase, projection_query_tables) { + TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); + ASSERT_NE(pConn, nullptr); + + TAOS_RES* pRes = taos_query(pConn, "use abc1"); + taos_free_result(pRes); + + pRes = taos_query(pConn, "create stable st1 (ts timestamp, k int) tags(a int)"); + if (taos_errno(pRes) != 0) { + printf("failed to create table tu, reason:%s\n", taos_errstr(pRes)); + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "create table tu using st1 tags(1)"); + if (taos_errno(pRes) != 0) { + printf("failed to create table tu, reason:%s\n", taos_errstr(pRes)); + } + taos_free_result(pRes); + + for(int32_t i = 0; i < 100000; ++i) { + char sql[512] = {0}; + sprintf(sql, "insert into tu values(now+%da, %d)", i, i); + TAOS_RES* p = taos_query(pConn, sql); + if (taos_errno(p) != 0) { + printf("failed to insert data, reason:%s\n", taos_errstr(p)); + } + + taos_free_result(p); + } + + pRes = taos_query(pConn, "select * from tu"); + if (taos_errno(pRes) != 0) { + printf("failed to select from table, reason:%s\n", taos_errstr(pRes)); + taos_free_result(pRes); + ASSERT_TRUE(false); + } + + TAOS_ROW pRow = NULL; + TAOS_FIELD* pFields = taos_fetch_fields(pRes); + int32_t numOfFields = taos_num_fields(pRes); + + char str[512] = {0}; + while ((pRow = taos_fetch_row(pRes)) != NULL) { + int32_t code = taos_print_row(str, pRow, pFields, numOfFields); + printf("%s\n", str); + } + + taos_free_result(pRes); + taos_close(pConn); +} + //TEST(testCase, projection_query_stables) { // TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); // ASSERT_NE(pConn, nullptr); diff --git a/source/libs/parser/src/dCDAstProcess.c b/source/libs/parser/src/dCDAstProcess.c index 994875c0a3..06a9a3d16e 100644 --- a/source/libs/parser/src/dCDAstProcess.c +++ b/source/libs/parser/src/dCDAstProcess.c @@ -26,7 +26,7 @@ static int32_t setShowInfo(SShowInfo* pShowInfo, SParseContext* pCtx, void** out const char* msg4 = "pattern is invalid"; const char* msg5 = "database name is empty"; const char* msg6 = "pattern string is empty"; - const char* msg7 = "db is not specified"; + const char* msg7 = "database not specified"; /* * database prefix in pInfo->pMiscInfo->a[0] * wildcard in like clause in pInfo->pMiscInfo->a[1] @@ -50,7 +50,11 @@ static int32_t setShowInfo(SShowInfo* pShowInfo, SParseContext* pCtx, void** out char dbFname[TSDB_DB_FNAME_LEN] = {0}; tNameGetFullDbName(&name, dbFname); - catalogGetDBVgroup(pCtx->pCatalog, pCtx->pTransporter, &pCtx->mgmtEpSet, dbFname, false, &array); + int32_t code = catalogGetDBVgroup(pCtx->pCatalog, pCtx->pTransporter, &pCtx->mgmtEpSet, dbFname, false, &array); + if (code != TSDB_CODE_SUCCESS) { + terrno = code; + return code; + } SVgroupInfo* info = taosArrayGet(array, 0); pShowReq->head.vgId = htonl(info->vgId);