diff --git a/2.0/src/query/src/queryMain.c b/2.0/src/query/src/queryMain.c index 3bf031ebd5..e6d804cea3 100644 --- a/2.0/src/query/src/queryMain.c +++ b/2.0/src/query/src/queryMain.c @@ -461,7 +461,7 @@ int32_t qKillQuery(qinfo_t qinfo) { } qDebug("QInfo:0x%"PRIx64" query killed", pQInfo->qId); - setQueryKilled(pQInfo); + setTaskKilled(pQInfo); // Wait for the query executing thread being stopped/ // Once the query is stopped, the owner of qHandle will be cleared immediately. @@ -634,7 +634,7 @@ int32_t qKillQueryByQId(void* pMgmt, int64_t qId, int32_t waitMs, int32_t waitCo return TSDB_CODE_QRY_INVALID_QHANDLE; } qWarn("QId:0x%"PRIx64" be killed(no memory commit).", pQInfo->qId); - setQueryKilled(pQInfo); + setTaskKilled(pQInfo); // wait query stop int32_t loop = 0; diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 9ee1aeca86..b3d595d73b 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1519,15 +1519,17 @@ static FORCE_INLINE void* taosDecodeSMqMsg(void* buf, SMqHbMsg* pMsg) { } typedef struct SMqSetCVgReq { - int32_t vgId; - int64_t oldConsumerId; - int64_t newConsumerId; - char topicName[TSDB_TOPIC_FNAME_LEN]; - char cgroup[TSDB_CONSUMER_GROUP_LEN]; - char* sql; - char* logicalPlan; - char* physicalPlan; - SSubQueryMsg msg; + int32_t vgId; + int64_t oldConsumerId; + int64_t newConsumerId; + char topicName[TSDB_TOPIC_FNAME_LEN]; + char cgroup[TSDB_CONSUMER_GROUP_LEN]; + char* sql; + char* logicalPlan; + char* physicalPlan; + uint32_t qmsgLen; + void* qmsg; + //SSubQueryMsg msg; } SMqSetCVgReq; static FORCE_INLINE int32_t tEncodeSSubQueryMsg(void** buf, const SSubQueryMsg* pMsg) { @@ -1536,7 +1538,7 @@ static FORCE_INLINE int32_t tEncodeSSubQueryMsg(void** buf, const SSubQueryMsg* tlen += taosEncodeFixedU64(buf, pMsg->queryId); tlen += taosEncodeFixedU64(buf, pMsg->taskId); tlen += taosEncodeFixedU32(buf, pMsg->contentLen); - tlen += taosEncodeBinary(buf, pMsg->msg, pMsg->contentLen); + //tlen += taosEncodeBinary(buf, pMsg->msg, pMsg->contentLen); return tlen; } @@ -1545,7 +1547,7 @@ static FORCE_INLINE void* tDecodeSSubQueryMsg(void* buf, SSubQueryMsg* pMsg) { buf = taosDecodeFixedU64(buf, &pMsg->queryId); buf = taosDecodeFixedU64(buf, &pMsg->taskId); buf = taosDecodeFixedU32(buf, &pMsg->contentLen); - buf = taosDecodeBinaryTo(buf, pMsg->msg, pMsg->contentLen); + //buf = taosDecodeBinaryTo(buf, pMsg->msg, pMsg->contentLen); return buf; } @@ -1559,7 +1561,9 @@ static FORCE_INLINE int32_t tEncodeSMqSetCVgReq(void** buf, const SMqSetCVgReq* tlen += taosEncodeString(buf, pReq->sql); tlen += taosEncodeString(buf, pReq->logicalPlan); tlen += taosEncodeString(buf, pReq->physicalPlan); - tlen += tEncodeSSubQueryMsg(buf, &pReq->msg); + tlen += taosEncodeFixedU32(buf, pReq->qmsgLen); + tlen += taosEncodeBinary(buf, pReq->qmsg, pReq->qmsgLen); + //tlen += tEncodeSSubQueryMsg(buf, &pReq->msg); return tlen; } @@ -1572,15 +1576,18 @@ static FORCE_INLINE void* tDecodeSMqSetCVgReq(void* buf, SMqSetCVgReq* pReq) { buf = taosDecodeString(buf, &pReq->sql); buf = taosDecodeString(buf, &pReq->logicalPlan); buf = taosDecodeString(buf, &pReq->physicalPlan); - buf = tDecodeSSubQueryMsg(buf, &pReq->msg); + buf = taosDecodeFixedU32(buf, &pReq->qmsgLen); + buf = taosDecodeBinary(buf, &pReq->qmsg, pReq->qmsgLen); + //buf = tDecodeSSubQueryMsg(buf, &pReq->msg); return buf; } typedef struct SMqSetCVgRsp { - int32_t vgId; - int64_t consumerId; - char topicName[TSDB_TOPIC_FNAME_LEN]; - char cGroup[TSDB_CONSUMER_GROUP_LEN]; + SMsgHead header; + int32_t vgId; + int64_t consumerId; + char topicName[TSDB_TOPIC_FNAME_LEN]; + char cGroup[TSDB_CONSUMER_GROUP_LEN]; } SMqSetCVgRsp; typedef struct SMqColData { diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index 457245e9a3..020f9f7ccd 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -32,7 +32,7 @@ struct SSubplan; * @param streamReadHandle * @return */ -qTaskInfo_t qCreateStreamExecTaskInfo(SSubQueryMsg *pMsg, void* streamReadHandle); +qTaskInfo_t qCreateStreamExecTaskInfo(void *msg, void* streamReadHandle); int32_t qSetStreamInput(qTaskInfo_t tinfo, void* input); diff --git a/include/libs/parser/parser.h b/include/libs/parser/parser.h index ecfa344f85..1d193888c9 100644 --- a/include/libs/parser/parser.h +++ b/include/libs/parser/parser.h @@ -74,6 +74,7 @@ void columnListCopy(SArray* dst, const SArray* src, uint64_t uid); void columnListDestroy(SArray* pColumnList); void dropAllExprInfo(SArray** pExprInfo, int32_t numOfLevel); +void dropOneLevelExprInfo(SArray* pExprInfo); typedef struct SSourceParam { SArray *pExprNodeList; //Array diff --git a/include/os/os.h b/include/os/os.h index 9112b4922f..023d2b0470 100644 --- a/include/os/os.h +++ b/include/os/os.h @@ -51,6 +51,7 @@ extern "C" { #include #include +#include #include "osAtomic.h" #include "osDef.h" diff --git a/include/os/osDef.h b/include/os/osDef.h index 040c4bc7e7..07d360a7c0 100644 --- a/include/os/osDef.h +++ b/include/os/osDef.h @@ -181,7 +181,8 @@ extern "C" { #endif #else // Windows - #define setThreadName(name) +// #define setThreadName(name) +#define setThreadName(name) do { prctl(PR_SET_NAME, (name)); } while (0) #endif #if defined(_WIN32) diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 6014042e11..179e46527f 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -218,6 +218,7 @@ int32_t getPlan(SRequestObj* pRequest, SQueryNode* pQueryNode, SQueryDag** pDag, if (pQueryNode->type == TSDB_SQL_SELECT) { setResSchemaInfo(&pRequest->body.resInfo, pSchema, numOfCols); + tfree(pSchema); pRequest->type = TDMT_VND_QUERY; } else { tfree(pSchema); @@ -313,10 +314,10 @@ tmq_conf_t* tmq_conf_new() { } int32_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value) { - if (strcmp(key, "group.id")) { + if (strcmp(key, "group.id") == 0) { strcpy(conf->groupId, value); } - if (strcmp(key, "client.id")) { + if (strcmp(key, "client.id") == 0) { strcpy(conf->clientId, value); } return 0; @@ -364,7 +365,7 @@ tmq_list_t* tmq_list_new() { int32_t tmq_list_append(tmq_list_t* ptr, char* src) { if (ptr->cnt >= ptr->tot-1) return -1; - ptr->elems[ptr->cnt] = src; + ptr->elems[ptr->cnt] = strdup(src); ptr->cnt++; return 0; } @@ -376,8 +377,23 @@ TAOS_RES* tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list) { int32_t sz = topic_list->cnt; tmq->clientTopics = taosArrayInit(sz, sizeof(void*)); for (int i = 0; i < sz; i++) { - char* topicName = strdup(topic_list->elems[i]); - taosArrayPush(tmq->clientTopics, &topicName); + char* topicName = topic_list->elems[i]; + + SName name = {0}; + char* dbName = getDbOfConnection(tmq->pTscObj); + tNameSetDbName(&name, tmq->pTscObj->acctId, dbName, strlen(dbName)); + tNameFromString(&name, topicName, T_NAME_TABLE); + + char* topicFname = calloc(1, TSDB_TOPIC_FNAME_LEN); + if (topicFname == NULL) { + + } + tNameExtractFullName(&name, topicFname); + tscDebug("subscribe topic: %s", topicFname); + taosArrayPush(tmq->clientTopics, &topicFname); + /*SMqClientTopic topic = {*/ + /*.*/ + /*};*/ } SCMSubscribeReq req; req.topicNum = taosArrayGetSize(tmq->clientTopics); @@ -401,7 +417,7 @@ TAOS_RES* tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list) { } pRequest->body.requestMsg = (SDataBuf){ .pData = buf, .len = tlen }; - pRequest->type = TDMT_MND_CREATE_TOPIC; + pRequest->type = TDMT_MND_SUBSCRIBE; SMsgSendInfo* body = buildMsgInfoImpl(pRequest); SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp); @@ -534,7 +550,7 @@ TAOS_RES *taos_create_topic(TAOS* taos, const char* topicName, const char* sql, SCMCreateTopicReq req = { .name = (char*) topicFname, - .igExists = 0, + .igExists = 1, .physicalPlan = (char*) pStr, .sql = (char*) sql, .logicalPlan = "no logic plan", diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp index c57aedee2e..d62d7cb826 100644 --- a/source/client/test/clientTests.cpp +++ b/source/client/test/clientTests.cpp @@ -458,10 +458,9 @@ TEST(testCase, show_table_Test) { assert(pConn != NULL); TAOS_RES* pRes = taos_query(pConn, "show tables"); - ASSERT_NE(taos_errno(pRes), 0); - if (taos_errno(pRes) != 0) { - printf("expected failed to show tables, reason:%s\n", taos_errstr(pRes)); + printf("failed to show tables, reason:%s\n", taos_errstr(pRes)); + taos_free_result(pRes); } taos_free_result(pRes); @@ -537,6 +536,7 @@ TEST(testCase, create_topic_Test) { if (taos_errno(pRes) != 0) { printf("error in use db, reason:%s\n", taos_errstr(pRes)); } + taos_free_result(pRes); TAOS_FIELD* pFields = taos_fetch_fields(pRes); ASSERT_TRUE(pFields == nullptr); @@ -570,6 +570,51 @@ TEST(testCase, insert_test) { taos_close(pConn); } +#if 0 +TEST(testCase, tmq_subscribe_Test) { + TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); + assert(pConn != NULL); + + tmq_conf_t* conf = tmq_conf_new(); + tmq_conf_set(conf, "group.id", "tg1"); + tmq_t* tmq = taos_consumer_new(pConn, conf, NULL, 0); + + tmq_list_t* topic_list = tmq_list_new(); + tmq_list_append(topic_list, "test_topic_1"); + tmq_subscribe(tmq, topic_list); + + while (1) { + tmq_message_t* msg = tmq_consume_poll(tmq, 0); + printf("get msg\n"); + if (msg == NULL) break; + } +} + +TEST(testCase, tmq_consume_Test) { +} + +TEST(testCase, tmq_commit_TEST) { +} +#endif + +//TEST(testCase, insert_test) { +// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); +// ASSERT_NE(pConn, nullptr); +// +// TAOS_RES* pRes = taos_query(pConn, "use abc1"); +// taos_free_result(pRes); +// +// pRes = taos_query(pConn, "insert into t_2 values(now, 1)"); +// if (taos_errno(pRes) != 0) { +// printf("failed to create multiple tables, reason:%s\n", taos_errstr(pRes)); +// taos_free_result(pRes); +// ASSERT_TRUE(false); +// } +// +// taos_free_result(pRes); +// taos_close(pConn); +//} + TEST(testCase, projection_query_tables) { TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); ASSERT_NE(pConn, nullptr); diff --git a/source/dnode/mgmt/impl/src/dndMgmt.c b/source/dnode/mgmt/impl/src/dndMgmt.c index 507f9a2644..b127fb1d64 100644 --- a/source/dnode/mgmt/impl/src/dndMgmt.c +++ b/source/dnode/mgmt/impl/src/dndMgmt.c @@ -487,6 +487,8 @@ static void *dnodeThreadRoutine(void *param) { SDnodeMgmt *pMgmt = &pDnode->dmgmt; int32_t ms = pDnode->cfg.statusInterval * 1000; + setThreadName("dnode-hb"); + while (true) { pthread_testcancel(); taosMsleep(ms); diff --git a/source/dnode/mgmt/impl/src/dndTransport.c b/source/dnode/mgmt/impl/src/dndTransport.c index 4ccab751b3..1da13bc3d9 100644 --- a/source/dnode/mgmt/impl/src/dndTransport.c +++ b/source/dnode/mgmt/impl/src/dndTransport.c @@ -112,6 +112,9 @@ static void dndInitMsgFp(STransMgmt *pMgmt) { pMgmt->msgFp[TMSG_INDEX(TDMT_MND_CREATE_TOPIC)] = dndProcessMnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_ALTER_TOPIC)] = dndProcessMnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_DROP_TOPIC)] = dndProcessMnodeWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_SUBSCRIBE)] = dndProcessMnodeWriteMsg; + /*pMgmt->msgFp[TMSG_INDEX(TDMT_VND_SUBSCRIBE_RSP)] = dndProcessMnodeWriteMsg;*/ + pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_SET_CONN_RSP)] = dndProcessMnodeWriteMsg; // Requests handled by VNODE pMgmt->msgFp[TMSG_INDEX(TDMT_VND_SUBMIT)] = dndProcessVnodeWriteMsg; @@ -142,6 +145,8 @@ static void dndInitMsgFp(STransMgmt *pMgmt) { pMgmt->msgFp[TMSG_INDEX(TDMT_VND_DROP_TABLE)] = dndProcessVnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_VND_SHOW_TABLES)] = dndProcessVnodeFetchMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_VND_SHOW_TABLES_FETCH)] = dndProcessVnodeFetchMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_SET_CONN)] = dndProcessVnodeWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_SET_CUR)] = dndProcessVnodeFetchMsg; } static void dndProcessResponse(void *parent, SRpcMsg *pRsp, SEpSet *pEpSet) { diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index e1103e7bb4..02ce3a1591 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -363,8 +363,9 @@ typedef struct SMqConsumerEp { int64_t consumerId; // -1 for unassigned int64_t lastConsumerHbTs; int64_t lastVgHbTs; - int32_t execLen; - SSubQueryMsg qExec; + uint32_t qmsgLen; + char* qmsg; + //SSubQueryMsg qExec; } SMqConsumerEp; static FORCE_INLINE int32_t tEncodeSMqConsumerEp(void** buf, SMqConsumerEp* pConsumerEp) { @@ -373,7 +374,9 @@ static FORCE_INLINE int32_t tEncodeSMqConsumerEp(void** buf, SMqConsumerEp* pCon tlen += taosEncodeFixedI32(buf, pConsumerEp->status); tlen += taosEncodeSEpSet(buf, &pConsumerEp->epSet); tlen += taosEncodeFixedI64(buf, pConsumerEp->consumerId); - tlen += tEncodeSSubQueryMsg(buf, &pConsumerEp->qExec); + //tlen += tEncodeSSubQueryMsg(buf, &pConsumerEp->qExec); + tlen += taosEncodeFixedU32(buf, pConsumerEp->qmsgLen); + tlen += taosEncodeBinary(buf, pConsumerEp->qmsg, pConsumerEp->qmsgLen); return tlen; } @@ -382,8 +385,9 @@ static FORCE_INLINE void* tDecodeSMqConsumerEp(void** buf, SMqConsumerEp* pConsu buf = taosDecodeFixedI32(buf, &pConsumerEp->status); buf = taosDecodeSEpSet(buf, &pConsumerEp->epSet); buf = taosDecodeFixedI64(buf, &pConsumerEp->consumerId); - buf = tDecodeSSubQueryMsg(buf, &pConsumerEp->qExec); - pConsumerEp->execLen = sizeof(SSubQueryMsg) + pConsumerEp->qExec.contentLen; + //buf = tDecodeSSubQueryMsg(buf, &pConsumerEp->qExec); + buf = taosDecodeFixedU32(buf, &pConsumerEp->qmsgLen); + buf = taosDecodeBinary(buf, (void**)&pConsumerEp->qmsg, pConsumerEp->qmsgLen); return buf; } @@ -402,11 +406,12 @@ typedef struct SMqSubscribeObj { static FORCE_INLINE SMqSubscribeObj* tNewSubscribeObj() { SMqSubscribeObj* pSub = malloc(sizeof(SMqSubscribeObj)); - pSub->key[0] = 0; - pSub->epoch = 0; if (pSub == NULL) { return NULL; } + pSub->key[0] = 0; + pSub->epoch = 0; + pSub->availConsumer = taosArrayInit(0, sizeof(int64_t)); if (pSub->availConsumer == NULL) { free(pSub); @@ -433,7 +438,7 @@ static FORCE_INLINE SMqSubscribeObj* tNewSubscribeObj() { free(pSub); return NULL; } - return NULL; + return pSub; } static FORCE_INLINE int32_t tEncodeSubscribeObj(void** buf, const SMqSubscribeObj* pSub) { diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 5cdd8e77bd..7591caebc5 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -56,7 +56,9 @@ void mndCleanupConsumer(SMnode *pMnode) {} SSdbRaw *mndConsumerActionEncode(SMqConsumerObj *pConsumer) { terrno = TSDB_CODE_OUT_OF_MEMORY; int32_t tlen = tEncodeSMqConsumerObj(NULL, pConsumer); - SSdbRaw *pRaw = sdbAllocRaw(SDB_CONSUMER, MND_CONSUMER_VER_NUMBER, tlen); + int32_t size = sizeof(int32_t) + tlen + MND_CONSUMER_RESERVE_SIZE; + + SSdbRaw *pRaw = sdbAllocRaw(SDB_CONSUMER, MND_CONSUMER_VER_NUMBER, size); if (pRaw == NULL) goto CM_ENCODE_OVER; void* buf = malloc(tlen); @@ -68,34 +70,6 @@ SSdbRaw *mndConsumerActionEncode(SMqConsumerObj *pConsumer) { int32_t dataPos = 0; SDB_SET_INT32(pRaw, dataPos, tlen, CM_ENCODE_OVER); SDB_SET_BINARY(pRaw, dataPos, buf, tlen, CM_ENCODE_OVER); - -#if 0 - int32_t topicNum = taosArrayGetSize(pConsumer->topics); - SDB_SET_INT64(pRaw, dataPos, pConsumer->consumerId, CM_ENCODE_OVER); - int32_t len = strlen(pConsumer->cgroup); - SDB_SET_INT32(pRaw, dataPos, len, CM_ENCODE_OVER); - SDB_SET_BINARY(pRaw, dataPos, pConsumer->cgroup, len, CM_ENCODE_OVER); - SDB_SET_INT32(pRaw, dataPos, topicNum, CM_ENCODE_OVER); - for (int i = 0; i < topicNum; i++) { - int32_t len; - SMqConsumerTopic *pConsumerTopic = taosArrayGet(pConsumer->topics, i); - len = strlen(pConsumerTopic->name); - SDB_SET_INT32(pRaw, dataPos, len, CM_ENCODE_OVER); - SDB_SET_BINARY(pRaw, dataPos, pConsumerTopic->name, len, CM_ENCODE_OVER); - int vgSize; - if (pConsumerTopic->vgroups == NULL) { - vgSize = 0; - } else { - vgSize = listNEles(pConsumerTopic->vgroups); - } - SDB_SET_INT32(pRaw, dataPos, vgSize, CM_ENCODE_OVER); - for (int j = 0; j < vgSize; j++) { - // SList* head; - /*SDB_SET_INT64(pRaw, dataPos, 0[> change to list item <]);*/ - } - } -#endif - SDB_SET_RESERVE(pRaw, dataPos, MND_CONSUMER_RESERVE_SIZE, CM_ENCODE_OVER); SDB_SET_DATALEN(pRaw, dataPos, CM_ENCODE_OVER); @@ -116,53 +90,35 @@ SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw) { terrno = TSDB_CODE_OUT_OF_MEMORY; int8_t sver = 0; - if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto CONSUME_DECODE_OVER; + if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto CM_DECODE_OVER; if (sver != MND_CONSUMER_VER_NUMBER) { terrno = TSDB_CODE_SDB_INVALID_DATA_VER; - goto CONSUME_DECODE_OVER; + goto CM_DECODE_OVER; } SSdbRow *pRow = sdbAllocRow(sizeof(SMqConsumerObj)); - if (pRow == NULL) goto CONSUME_DECODE_OVER; + if (pRow == NULL) goto CM_DECODE_OVER; SMqConsumerObj *pConsumer = sdbGetRowObj(pRow); - if (pConsumer == NULL) goto CONSUME_DECODE_OVER; + if (pConsumer == NULL) goto CM_DECODE_OVER; int32_t dataPos = 0; int32_t len; - SDB_GET_INT32(pRaw, dataPos, &len, CONSUME_DECODE_OVER); + SDB_GET_INT32(pRaw, dataPos, &len, CM_DECODE_OVER); void* buf = malloc(len); - if (buf == NULL) goto CONSUME_DECODE_OVER; - - SDB_GET_BINARY(pRaw, dataPos, buf, len, CONSUME_DECODE_OVER); + if (buf == NULL) goto CM_DECODE_OVER; + SDB_GET_BINARY(pRaw, dataPos, buf, len, CM_DECODE_OVER); + SDB_GET_RESERVE(pRaw, dataPos, MND_CONSUMER_RESERVE_SIZE, CM_DECODE_OVER); - tDecodeSMqConsumerObj(buf, pConsumer); - - SDB_GET_RESERVE(pRaw, dataPos, MND_CONSUMER_RESERVE_SIZE, CONSUME_DECODE_OVER); + if (tDecodeSMqConsumerObj(buf, pConsumer) == NULL) { + goto CM_DECODE_OVER; + } terrno = TSDB_CODE_SUCCESS; -#if 0 - SDB_GET_INT32(pRaw, dataPos, &topicNum, CONSUME_DECODE_OVER); - for (int i = 0; i < topicNum; i++) { - int32_t topicLen; - SMqConsumerTopic *pConsumerTopic = malloc(sizeof(SMqConsumerTopic)); - if (pConsumerTopic == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - // TODO - return NULL; - } - /*pConsumerTopic->vgroups = taosArrayInit(topicNum, sizeof(SMqConsumerTopic));*/ - SDB_GET_INT32(pRaw, dataPos, &topicLen, CONSUME_DECODE_OVER); - SDB_GET_BINARY(pRaw, dataPos, pConsumerTopic->name, topicLen, CONSUME_DECODE_OVER); - int32_t vgSize; - SDB_GET_INT32(pRaw, dataPos, &vgSize, CONSUME_DECODE_OVER); - } -#endif - -CONSUME_DECODE_OVER: - if (terrno != 0) { +CM_DECODE_OVER: + if (terrno != TSDB_CODE_SUCCESS) { mError("consumer:%ld, failed to decode from raw:%p since %s", pConsumer->consumerId, pRaw, terrstr()); tfree(pRow); return NULL; diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 9a573cbe2c..78e9a7c17c 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -55,7 +55,7 @@ int32_t mndInitSubscribe(SMnode *pMnode) { .deleteFp = (SdbDeleteFp)mndSubActionDelete}; mndSetMsgHandle(pMnode, TDMT_MND_SUBSCRIBE, mndProcessSubscribeReq); - mndSetMsgHandle(pMnode, TDMT_VND_SUBSCRIBE_RSP, mndProcessSubscribeInternalRsp); + mndSetMsgHandle(pMnode, TDMT_VND_MQ_SET_CONN_RSP, mndProcessSubscribeInternalRsp); mndSetMsgHandle(pMnode, TDMT_MND_MQ_TIMER, mndProcessMqTimerMsg); return sdbSetTable(pMnode->pSdb, table); } @@ -96,25 +96,27 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) { pSub->nextConsumerIdx = (pSub->nextConsumerIdx + 1) % taosArrayGetSize(pSub->availConsumer); // build msg - SMqSetCVgReq req = { - .vgId = pCEp->vgId, - .oldConsumerId = -1, - .newConsumerId = consumerId, - }; - strcpy(req.cgroup, cgroup); - strcpy(req.topicName, topic); - strcpy(req.sql, pTopic->sql); - strcpy(req.logicalPlan, pTopic->logicalPlan); - strcpy(req.physicalPlan, pTopic->physicalPlan); - memcpy(&req.msg, &pCEp->qExec, pCEp->execLen); - int32_t tlen = tEncodeSMqSetCVgReq(NULL, &req); + + SMqSetCVgReq* pReq = malloc(sizeof(SMqSetCVgReq) + pCEp->qmsgLen); + if (pReq == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + strcpy(pReq->cgroup, cgroup); + strcpy(pReq->topicName, topic); + pReq->sql = strdup(pTopic->sql); + pReq->logicalPlan = strdup(pTopic->logicalPlan); + pReq->physicalPlan = strdup(pTopic->physicalPlan); + pReq->qmsgLen = pCEp->qmsgLen; + memcpy(pReq->qmsg, pCEp->qmsg, pCEp->qmsgLen); + int32_t tlen = tEncodeSMqSetCVgReq(NULL, pReq); void *reqStr = malloc(tlen); if (reqStr == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } void *abuf = reqStr; - tEncodeSMqSetCVgReq(abuf, &req); + tEncodeSMqSetCVgReq(&abuf, pReq); // persist msg STransAction action = {0}; @@ -128,6 +130,7 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) { SSdbRaw *pRaw = mndSubActionEncode(pSub); mndTransAppendRedolog(pTrans, pRaw); + free(pReq); tfree(topic); tfree(cgroup); } @@ -146,6 +149,14 @@ static int mndInitUnassignedVg(SMnode *pMnode, SMqTopicObj *pTopic, SArray *unas //convert phyplan to dag SQueryDag *pDag = qStringToDag(pTopic->physicalPlan); SArray *pArray; + SArray* inner = taosArrayGet(pDag->pSubplans, 0); + SSubplan *plan = taosArrayGetP(inner, 0); + plan->execNode.inUse = 0; + strcpy(plan->execNode.epAddr[0].fqdn, "localhost"); + plan->execNode.epAddr[0].port = 6030; + plan->execNode.nodeId = 2; + plan->execNode.numOfEps = 1; + if (schedulerConvertDagToTaskList(pDag, &pArray) < 0) { return -1; } @@ -157,11 +168,18 @@ static int mndInitUnassignedVg(SMnode *pMnode, SMqTopicObj *pTopic, SArray *unas CEp.lastConsumerHbTs = CEp.lastVgHbTs = -1; STaskInfo* pTaskInfo = taosArrayGet(pArray, i); tConvertQueryAddrToEpSet(&CEp.epSet, &pTaskInfo->addr); + /*mDebug("subscribe convert ep %d %s %s %s %s %s\n", CEp.epSet.numOfEps, CEp.epSet.fqdn[0], CEp.epSet.fqdn[1], CEp.epSet.fqdn[2], CEp.epSet.fqdn[3], CEp.epSet.fqdn[4]);*/ CEp.vgId = pTaskInfo->addr.nodeId; + CEp.qmsgLen = pTaskInfo->msg->contentLen; + CEp.qmsg = malloc(CEp.qmsgLen); + if (CEp.qmsg == NULL) { + return -1; + } + memcpy(CEp.qmsg, pTaskInfo->msg->msg, pTaskInfo->msg->contentLen); taosArrayPush(unassignedVg, &CEp); } - qDestroyQueryDag(pDag); + /*qDestroyQueryDag(pDag);*/ return 0; } @@ -178,27 +196,33 @@ static int mndBuildMqSetConsumerVgReq(SMnode *pMnode, STrans *pTrans, SMqConsume }; strcpy(req.cgroup, pConsumer->cgroup); strcpy(req.topicName, pTopic->name); - strcpy(req.sql, pTopic->sql); - strcpy(req.logicalPlan, pTopic->logicalPlan); - strcpy(req.physicalPlan, pTopic->physicalPlan); + req.sql = pTopic->sql; + req.logicalPlan = pTopic->logicalPlan; + req.physicalPlan = pTopic->physicalPlan; int32_t tlen = tEncodeSMqSetCVgReq(NULL, &req); - void *reqStr = malloc(tlen); - if (reqStr == NULL) { + void *buf = malloc(sizeof(SMsgHead) + tlen); + if (buf == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } - void *abuf = reqStr; + + SMsgHead* pMsgHead = (SMsgHead*)buf; + + pMsgHead->contLen = htonl(sizeof(SMsgHead) + tlen); + pMsgHead->vgId = htonl(vgId); + + void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); tEncodeSMqSetCVgReq(&abuf, &req); STransAction action = {0}; action.epSet = mndGetVgroupEpset(pMnode, pVgObj); - action.pCont = reqStr; + action.pCont = buf; action.contLen = tlen; action.msgType = TDMT_VND_MQ_SET_CONN; mndReleaseVgroup(pMnode, pVgObj); if (mndTransAppendRedoAction(pTrans, &action) != 0) { - free(reqStr); + free(buf); return -1; } } @@ -208,19 +232,18 @@ static int mndBuildMqSetConsumerVgReq(SMnode *pMnode, STrans *pTrans, SMqConsume void mndCleanupSubscribe(SMnode *pMnode) {} static SSdbRaw *mndSubActionEncode(SMqSubscribeObj *pSub) { + terrno = TSDB_CODE_OUT_OF_MEMORY; int32_t tlen = tEncodeSubscribeObj(NULL, pSub); - int32_t size = tlen + MND_SUBSCRIBE_RESERVE_SIZE; + int32_t size = sizeof(int32_t) + tlen + MND_SUBSCRIBE_RESERVE_SIZE; SSdbRaw *pRaw = sdbAllocRaw(SDB_SUBSCRIBE, MND_SUBSCRIBE_VER_NUMBER, size); if (pRaw == NULL) goto SUB_ENCODE_OVER; void *buf = malloc(tlen); - if (buf == NULL) { - goto SUB_ENCODE_OVER; - } - void *abuf = buf; + if (buf == NULL) goto SUB_ENCODE_OVER; - tEncodeSubscribeObj(&buf, pSub); + void *abuf = buf; + tEncodeSubscribeObj(&abuf, pSub); int32_t dataPos = 0; SDB_SET_INT32(pRaw, dataPos, tlen, SUB_ENCODE_OVER); @@ -228,6 +251,8 @@ static SSdbRaw *mndSubActionEncode(SMqSubscribeObj *pSub) { SDB_SET_RESERVE(pRaw, dataPos, MND_SUBSCRIBE_RESERVE_SIZE, SUB_ENCODE_OVER); SDB_SET_DATALEN(pRaw, dataPos, SUB_ENCODE_OVER); + terrno = TSDB_CODE_SUCCESS; + SUB_ENCODE_OVER: if (terrno != 0) { mError("subscribe:%s, failed to encode to raw:%p since %s", pSub->key, pRaw, terrstr()); @@ -259,9 +284,9 @@ static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw) { int32_t dataPos = 0; int32_t tlen; + SDB_GET_INT32(pRaw, dataPos, &tlen, SUB_DECODE_OVER); void *buf = malloc(tlen + 1); if (buf == NULL) goto SUB_DECODE_OVER; - SDB_GET_INT32(pRaw, dataPos, &tlen, SUB_DECODE_OVER); SDB_GET_BINARY(pRaw, dataPos, buf, tlen, SUB_DECODE_OVER); SDB_GET_RESERVE(pRaw, dataPos, MND_SUBSCRIBE_RESERVE_SIZE, SUB_DECODE_OVER); @@ -269,8 +294,10 @@ static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw) { goto SUB_DECODE_OVER; } + terrno = TSDB_CODE_SUCCESS; + SUB_DECODE_OVER: - if (terrno != 0) { + if (terrno != TSDB_CODE_SUCCESS) { mError("subscribe:%s, failed to decode from raw:%p since %s", pSub->key, pRaw, terrstr()); // TODO free subscribeobj tfree(pRow); @@ -379,10 +406,10 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) { oldTopicName = ((SMqConsumerTopic *)taosArrayGet(oldSub, j))->name; j++; } else if (j >= oldTopicNum) { - newTopicName = taosArrayGet(newSub, i); + newTopicName = taosArrayGetP(newSub, i); i++; } else { - newTopicName = taosArrayGet(newSub, i); + newTopicName = taosArrayGetP(newSub, i); oldTopicName = ((SMqConsumerTopic *)taosArrayGet(oldSub, j))->name; int comp = compareLenPrefixedStr(newTopicName, oldTopicName); @@ -466,6 +493,8 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } + char* key = mndMakeSubscribeKey(consumerGroup, newTopicName); + strcpy(pSub->key, key); // set unassigned vg mndInitUnassignedVg(pMnode, pTopic, pSub->unassignedVg); //TODO: disable alter @@ -486,7 +515,7 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) { } SSdbRaw *pRaw = mndSubActionEncode(pSub); - /*sdbSetRawStatus(pRaw, SDB_STATUS_READY);*/ + sdbSetRawStatus(pRaw, SDB_STATUS_READY); mndTransAppendRedolog(pTrans, pRaw); #if 0 SMqCGroup *pGroup = taosHashGet(pTopic->cgroups, consumerGroup, cgroupLen); @@ -519,8 +548,8 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) { mndTransAppendRedolog(pTrans, pTopicRaw); #endif - mndReleaseTopic(pMnode, pTopic); - mndReleaseSubscribe(pMnode, pSub); + /*mndReleaseTopic(pMnode, pTopic);*/ + /*mndReleaseSubscribe(pMnode, pSub);*/ } } // part3. persist consumerObj diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c index ac66e7d88b..fa043cf7a0 100644 --- a/source/dnode/mnode/impl/src/mndTopic.c +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -60,7 +60,9 @@ void mndCleanupTopic(SMnode *pMnode) {} SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) { terrno = TSDB_CODE_OUT_OF_MEMORY; - int32_t size = sizeof(SMqTopicObj) + MND_TOPIC_RESERVE_SIZE; + int32_t logicalPlanLen = strlen(pTopic->logicalPlan) + 1; + int32_t physicalPlanLen = strlen(pTopic->physicalPlan) + 1; + int32_t size = sizeof(SMqTopicObj) + logicalPlanLen + physicalPlanLen + pTopic->sqlLen + MND_TOPIC_RESERVE_SIZE; SSdbRaw *pRaw = sdbAllocRaw(SDB_TOPIC, MND_TOPIC_VER_NUMBER, size); if (pRaw == NULL) goto TOPIC_ENCODE_OVER; @@ -74,12 +76,10 @@ SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) { SDB_SET_INT32(pRaw, dataPos, pTopic->version, TOPIC_ENCODE_OVER); SDB_SET_INT32(pRaw, dataPos, pTopic->sqlLen, TOPIC_ENCODE_OVER); SDB_SET_BINARY(pRaw, dataPos, pTopic->sql, pTopic->sqlLen, TOPIC_ENCODE_OVER); - int32_t logicalPlanLen = strlen(pTopic->logicalPlan) + 1; - SDB_SET_INT32(pRaw, dataPos, strlen(pTopic->logicalPlan)+1, TOPIC_ENCODE_OVER); + SDB_SET_INT32(pRaw, dataPos, logicalPlanLen, TOPIC_ENCODE_OVER); SDB_SET_BINARY(pRaw, dataPos, pTopic->logicalPlan, logicalPlanLen, TOPIC_ENCODE_OVER); - int32_t physicalPlanLen = strlen(pTopic->physicalPlan) + 1; - SDB_SET_INT32(pRaw, dataPos, strlen(pTopic->physicalPlan)+1, TOPIC_ENCODE_OVER); + SDB_SET_INT32(pRaw, dataPos, physicalPlanLen, TOPIC_ENCODE_OVER); SDB_SET_BINARY(pRaw, dataPos, pTopic->physicalPlan, physicalPlanLen, TOPIC_ENCODE_OVER); SDB_SET_RESERVE(pRaw, dataPos, MND_TOPIC_RESERVE_SIZE, TOPIC_ENCODE_OVER); @@ -135,7 +135,7 @@ SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) { terrno = TSDB_CODE_OUT_OF_MEMORY; goto TOPIC_DECODE_OVER; } - SDB_GET_BINARY(pRaw, dataPos, pTopic->logicalPlan, len+1, TOPIC_DECODE_OVER); + SDB_GET_BINARY(pRaw, dataPos, pTopic->logicalPlan, len, TOPIC_DECODE_OVER); SDB_GET_INT32(pRaw, dataPos, &len, TOPIC_DECODE_OVER); pTopic->physicalPlan = calloc(len + 1, sizeof(char)); @@ -144,7 +144,7 @@ SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) { terrno = TSDB_CODE_OUT_OF_MEMORY; goto TOPIC_DECODE_OVER; } - SDB_GET_BINARY(pRaw, dataPos, pTopic->physicalPlan, len+1, TOPIC_DECODE_OVER); + SDB_GET_BINARY(pRaw, dataPos, pTopic->physicalPlan, len, TOPIC_DECODE_OVER); SDB_GET_RESERVE(pRaw, dataPos, MND_TOPIC_RESERVE_SIZE, TOPIC_DECODE_OVER); @@ -231,6 +231,7 @@ static int32_t mndCheckCreateTopicMsg(SCMCreateTopicReq *creattopReq) { } static int32_t mndCreateTopic(SMnode *pMnode, SMnodeMsg *pMsg, SCMCreateTopicReq *pCreate, SDbObj *pDb) { + mDebug("topic:%s to create", pCreate->name); SMqTopicObj topicObj = {0}; tstrncpy(topicObj.name, pCreate->name, TSDB_TOPIC_FNAME_LEN); tstrncpy(topicObj.db, pDb->name, TSDB_DB_FNAME_LEN); @@ -273,7 +274,7 @@ static int32_t mndProcessCreateTopicMsg(SMnodeMsg *pMsg) { return 0; } else { terrno = TSDB_CODE_MND_TOPIC_ALREADY_EXIST; - mError("db:%s, failed to create since %s", createTopicReq.name, terrstr()); + mError("topic:%s, failed to create since already exists", createTopicReq.name); return -1; } } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 2b5a995988..cfe2d6fd69 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -799,9 +799,9 @@ int32_t tqProcessSetConnReq(STQ* pTq, char* msg) { return -1; } strcpy(pTopic->topicName, req.topicName); - strcpy(pTopic->sql, req.sql); - strcpy(pTopic->logicalPlan, req.logicalPlan); - strcpy(pTopic->physicalPlan, req.physicalPlan); + pTopic->sql = strdup(req.sql); + pTopic->logicalPlan = strdup(req.logicalPlan); + pTopic->physicalPlan = strdup(req.physicalPlan); pTopic->buffer.firstOffset = -1; pTopic->buffer.lastOffset = -1; @@ -811,9 +811,10 @@ int32_t tqProcessSetConnReq(STQ* pTq, char* msg) { for (int i = 0; i < TQ_BUFFER_SIZE; i++) { pTopic->buffer.output[i].status = 0; STqReadHandle* pReadHandle = tqInitSubmitMsgScanner(pTq->pMeta); - pTopic->buffer.output[i].task = qCreateStreamExecTaskInfo(&req.msg, pReadHandle); + pTopic->buffer.output[i].task = qCreateStreamExecTaskInfo(&req.qmsg, pReadHandle); } taosArrayPush(pConsumer->topics, pTopic); + terrno = TSDB_CODE_SUCCESS; return 0; } @@ -826,7 +827,7 @@ STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta) { pReadHandle->pMsg = NULL; pReadHandle->ver = -1; pReadHandle->pColIdList = NULL; - return NULL; + return pReadHandle; } void tqReadHandleSetMsg(STqReadHandle* pReadHandle, SSubmitMsg* pMsg, int64_t ver) { diff --git a/source/dnode/vnode/src/vnd/vnodeMgr.c b/source/dnode/vnode/src/vnd/vnodeMgr.c index 6829d778f0..d762844120 100644 --- a/source/dnode/vnode/src/vnd/vnodeMgr.c +++ b/source/dnode/vnode/src/vnd/vnodeMgr.c @@ -98,6 +98,8 @@ int32_t vnodePutReqToVQueryQ(SVnode* pVnode, struct SRpcMsg* pReq) { /* ------------------------ STATIC METHODS ------------------------ */ static void* loop(void* arg) { + setThreadName("vnode-commit"); + SVnodeTask* pTask; for (;;) { pthread_mutex_lock(&(vnodeMgr.mutex)); diff --git a/source/dnode/vnode/src/vnd/vnodeWrite.c b/source/dnode/vnode/src/vnd/vnodeWrite.c index 169dc32c1c..326d99ddbb 100644 --- a/source/dnode/vnode/src/vnd/vnodeWrite.c +++ b/source/dnode/vnode/src/vnd/vnodeWrite.c @@ -115,7 +115,8 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { } break; case TDMT_VND_MQ_SET_CONN: { - if (tqProcessSetConnReq(pVnode->pTq, ptr) < 0) { + if (tqProcessSetConnReq(pVnode->pTq, POINTER_SHIFT(ptr, sizeof(SMsgHead))) < 0) { + // TODO: handle error } } break; default: diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index ebf3d83a1a..b3f5370c0f 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -250,9 +250,8 @@ typedef struct SExecTaskInfo { STaskCostInfo cost; int64_t owner; // if it is in execution int32_t code; - + uint64_t totalRows; // total number of rows STableGroupInfo tableqinfoGroupInfo; // this is a group array list, including SArray structure - pthread_mutex_t lock; // used to synchronize the rsp/query threads char *sql; // query sql string jmp_buf env; // struct SOperatorInfo *pRoot; @@ -622,8 +621,6 @@ int32_t createIndirectQueryFuncExprFromMsg(SQueryTableReq *pQueryMsg, int32_t nu int32_t createQueryFilter(char *data, uint16_t len, SFilterInfo** pFilters); SGroupbyExpr *createGroupbyExprFromMsg(SQueryTableReq *pQueryMsg, SColIndex *pColIndex, int32_t *code); -SQInfo *createQInfoImpl(SQueryTableReq *pQueryMsg, SGroupbyExpr *pGroupbyExpr, SExprInfo *pExprs, - SExprInfo *pSecExprs, STableGroupInfo *pTableGroupInfo, SColumnInfo* pTagCols, SFilterInfo* pFilters, int32_t vgId, char* sql, uint64_t qId, struct SUdfInfo* pUdfInfo); int32_t initQInfo(STsBufInfo* pTsBufInfo, void* tsdb, void* sourceOptr, SQInfo* pQInfo, STaskParam* param, char* start, int32_t prevResultLen, void* merger); @@ -645,20 +642,18 @@ void setQueryStatus(STaskRuntimeEnv *pRuntimeEnv, int8_t status); bool onlyQueryTags(STaskAttr* pQueryAttr); //void destroyUdfInfo(struct SUdfInfo* pUdfInfo); -bool isValidQInfo(void *param); - int32_t doDumpQueryResult(SQInfo *pQInfo, char *data, int8_t compressed, int32_t *compLen); size_t getResultSize(SQInfo *pQInfo, int64_t *numOfRows); -void setQueryKilled(SQInfo *pQInfo); +void setTaskKilled(SExecTaskInfo *pTaskInfo); void publishOperatorProfEvent(SOperatorInfo* operatorInfo, EQueryProfEventType eventType); void publishQueryAbortEvent(SExecTaskInfo * pTaskInfo, int32_t code); void calculateOperatorProfResults(SQInfo* pQInfo); -void queryCostStatis(SQInfo *pQInfo); +void queryCostStatis(SExecTaskInfo *pTaskInfo); -void doDestroyTask(SQInfo *pQInfo); +void doDestroyTask(SExecTaskInfo *pTaskInfo); void freeQueryAttr(STaskAttr *pQuery); int32_t getMaximumIdleDurationSec(); diff --git a/source/libs/executor/src/dataDispatcher.c b/source/libs/executor/src/dataDispatcher.c index f9e61f91de..97a0557748 100644 --- a/source/libs/executor/src/dataDispatcher.c +++ b/source/libs/executor/src/dataDispatcher.c @@ -121,11 +121,19 @@ static void toDataCacheEntry(const SDataDispatchHandle* pHandle, const SInputDat } static bool allocBuf(SDataDispatchHandle* pDispatcher, const SInputData* pInput, SDataDispatchBuf* pBuf) { - if (taosQueueSize(pDispatcher->pDataBlocks) >= pDispatcher->pManager->cfg.maxDataBlockNumPerQuery) { + uint32_t capacity = pDispatcher->pManager->cfg.maxDataBlockNumPerQuery; + if (taosQueueSize(pDispatcher->pDataBlocks) > capacity) { + qError("SinkNode queue is full, no capacity, max:%d, current:%d, no capacity", capacity, + taosQueueSize(pDispatcher->pDataBlocks)); return false; } + pBuf->allocSize = DATA_META_LENGTH(pInput->pTableRetrieveTsMap) + pDispatcher->schema.resultRowSize * pInput->pData->info.rows; pBuf->pData = malloc(pBuf->allocSize); + if (pBuf->pData == NULL) { + qError("SinkNode failed to malloc memory, size:%d, code:%d", pBuf->allocSize, TAOS_SYSTEM_ERROR(errno)); + } + return NULL != pBuf->pData; } diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 2f1f40813c..b7be85dc34 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -60,8 +60,8 @@ int32_t qSetStreamInput(qTaskInfo_t tinfo, void* input) { return code; } -qTaskInfo_t qCreateStreamExecTaskInfo(SSubQueryMsg* pMsg, void* streamReadHandle) { - if (pMsg == NULL || streamReadHandle == NULL) { +qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, void* streamReadHandle) { + if (msg == NULL || streamReadHandle == NULL) { return NULL; } @@ -74,7 +74,7 @@ qTaskInfo_t qCreateStreamExecTaskInfo(SSubQueryMsg* pMsg, void* streamReadHandle #endif struct SSubplan* plan = NULL; - int32_t code = qStringToSubplan(pMsg->msg, &plan); + int32_t code = qStringToSubplan(msg, &plan); if (code != TSDB_CODE_SUCCESS) { terrno = code; return NULL; diff --git a/source/libs/executor/src/executorMain.c b/source/libs/executor/src/executorMain.c index e5d56aca15..b60943d284 100644 --- a/source/libs/executor/src/executorMain.c +++ b/source/libs/executor/src/executorMain.c @@ -149,7 +149,7 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) { int64_t curOwner = 0; if ((curOwner = atomic_val_compare_exchange_64(&pTaskInfo->owner, 0, threadId)) != 0) { - qError("QInfo:0x%" PRIx64 "-%p qhandle is now executed by thread:%p", GET_TASKID(pTaskInfo), pTaskInfo, + qError("QID:0x%" PRIx64 "-%p qhandle is now executed by thread:%p", GET_TASKID(pTaskInfo), pTaskInfo, (void*)curOwner); pTaskInfo->code = TSDB_CODE_QRY_IN_EXEC; return pTaskInfo->code; @@ -160,7 +160,7 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) { } if (isTaskKilled(pTaskInfo)) { - qDebug("QInfo:0x%" PRIx64 " it is already killed, abort", GET_TASKID(pTaskInfo)); + qDebug("QID:0x%" PRIx64 " it is already killed, abort", GET_TASKID(pTaskInfo)); return TSDB_CODE_SUCCESS; } @@ -169,12 +169,12 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) { if (ret != TSDB_CODE_SUCCESS) { publishQueryAbortEvent(pTaskInfo, ret); pTaskInfo->code = ret; - qDebug("QInfo:0x%" PRIx64 " query abort due to error/cancel occurs, code:%s", GET_TASKID(pTaskInfo), + qDebug("QID:0x%" PRIx64 " query abort due to error/cancel occurs, code:%s", GET_TASKID(pTaskInfo), tstrerror(pTaskInfo->code)); return pTaskInfo->code; } - qDebug("QInfo:0x%" PRIx64 " query task is launched", GET_TASKID(pTaskInfo)); + qDebug("QID:0x%" PRIx64 " query task is launched", GET_TASKID(pTaskInfo)); bool newgroup = false; publishOperatorProfEvent(pTaskInfo->pRoot, QUERY_PROF_BEFORE_OPERATOR_EXEC); @@ -190,8 +190,11 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) { *useconds = pTaskInfo->cost.elapsedTime; } - qDebug("QInfo:0x%" PRIx64 " query paused, %d rows returned, total:%" PRId64 " rows, in sinkNode:%d", - GET_TASKID(pTaskInfo), 0, 0L, 0); + int32_t current = (*pRes != NULL)? (*pRes)->info.rows:0; + pTaskInfo->totalRows += current; + + qDebug("QID:0x%" PRIx64 " task paused, %d rows returned, total:%" PRId64 " rows, in sinkNode:%d", + GET_TASKID(pTaskInfo), current, pTaskInfo->totalRows, 0); atomic_store_64(&pTaskInfo->owner, 0); return pTaskInfo->code; @@ -200,14 +203,14 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) { int32_t qRetrieveQueryResultInfo(qTaskInfo_t qinfo, bool* buildRes, void* pRspContext) { SQInfo *pQInfo = (SQInfo *)qinfo; - if (pQInfo == NULL || !isValidQInfo(pQInfo)) { + if (pQInfo == NULL) { qError("QInfo invalid qhandle"); return TSDB_CODE_QRY_INVALID_QHANDLE; } *buildRes = false; if (IS_QUERY_KILLED(pQInfo)) { - qDebug("QInfo:0x%"PRIx64" query is killed, code:0x%08x", pQInfo->qId, pQInfo->code); + qDebug("QID:0x%"PRIx64" query is killed, code:0x%08x", pQInfo->qId, pQInfo->code); return pQInfo->code; } @@ -227,11 +230,11 @@ int32_t qRetrieveQueryResultInfo(qTaskInfo_t qinfo, bool* buildRes, void* pRspCo assert(pQInfo->rspContext == NULL); if (pQInfo->dataReady == QUERY_RESULT_READY) { *buildRes = true; - qDebug("QInfo:0x%"PRIx64" retrieve result info, rowsize:%d, rows:%d, code:%s", pQInfo->qId, pQueryAttr->resultRowSize, + qDebug("QID:0x%"PRIx64" retrieve result info, rowsize:%d, rows:%d, code:%s", pQInfo->qId, pQueryAttr->resultRowSize, GET_NUM_OF_RESULTS(pRuntimeEnv), tstrerror(pQInfo->code)); } else { *buildRes = false; - qDebug("QInfo:0x%"PRIx64" retrieve req set query return result after paused", pQInfo->qId); + qDebug("QID:0x%"PRIx64" retrieve req set query return result after paused", pQInfo->qId); pQInfo->rspContext = pRspContext; assert(pQInfo->rspContext != NULL); } @@ -251,18 +254,18 @@ void* qGetResultRetrieveMsg(qTaskInfo_t qinfo) { } int32_t qKillTask(qTaskInfo_t qinfo) { - SQInfo *pQInfo = (SQInfo *)qinfo; + SExecTaskInfo *pTaskInfo = (SExecTaskInfo *)qinfo; - if (pQInfo == NULL || !isValidQInfo(pQInfo)) { + if (pTaskInfo == NULL) { return TSDB_CODE_QRY_INVALID_QHANDLE; } - qDebug("QInfo:0x%"PRIx64" query killed", pQInfo->qId); - setQueryKilled(pQInfo); + qDebug("QID:0x%"PRIx64" execTask killed", pTaskInfo->id.queryId); + setTaskKilled(pTaskInfo); // Wait for the query executing thread being stopped/ // Once the query is stopped, the owner of qHandle will be cleared immediately. - while (pQInfo->owner != 0) { + while (pTaskInfo->owner != 0) { taosMsleep(100); } @@ -270,14 +273,14 @@ int32_t qKillTask(qTaskInfo_t qinfo) { } int32_t qAsyncKillTask(qTaskInfo_t qinfo) { - SQInfo *pQInfo = (SQInfo *)qinfo; + SExecTaskInfo *pTaskInfo = (SExecTaskInfo *)qinfo; - if (pQInfo == NULL || !isValidQInfo(pQInfo)) { + if (pTaskInfo == NULL) { return TSDB_CODE_QRY_INVALID_QHANDLE; } - qDebug("QInfo:0x%"PRIx64" query async killed", pQInfo->qId); - setQueryKilled(pQInfo); + qDebug("QID:0x%"PRIx64" query async killed", pTaskInfo->id.queryId); + setTaskKilled(pTaskInfo); return TSDB_CODE_SUCCESS; } @@ -292,15 +295,12 @@ int32_t qIsTaskCompleted(qTaskInfo_t qinfo) { return isTaskKilled(pTaskInfo) || Q_STATUS_EQUAL(pTaskInfo->status, TASK_OVER); } -void qDestroyTask(qTaskInfo_t qHandle) { - SQInfo* pQInfo = (SQInfo*) qHandle; - if (!isValidQInfo(pQInfo)) { - return; - } +void qDestroyTask(qTaskInfo_t qTaskHandle) { + SExecTaskInfo* pTaskInfo = (SExecTaskInfo*) qTaskHandle; + qDebug("QID:0x%"PRIx64" execTask completed, numOfRows:%"PRId64, pTaskInfo->id.queryId, pTaskInfo->totalRows); - qDebug("QInfo:0x%"PRIx64" query completed", pQInfo->qId); - queryCostStatis(pQInfo); // print the query cost summary - doDestroyTask(pQInfo); + queryCostStatis(pTaskInfo); // print the query cost summary + doDestroyTask(pTaskInfo); } void* qOpenTaskMgmt(int32_t vgId) { @@ -381,7 +381,7 @@ void** qRegisterTask(void* pMgmt, uint64_t qId, void *qInfo) { STaskMgmt *pQueryMgmt = pMgmt; if (pQueryMgmt->qinfoPool == NULL) { - qError("QInfo:0x%"PRIx64"-%p failed to add qhandle into qMgmt, since qMgmt is closed", qId, (void*)qInfo); + qError("QID:0x%"PRIx64"-%p failed to add qhandle into qMgmt, since qMgmt is closed", qId, (void*)qInfo); terrno = TSDB_CODE_VND_INVALID_VGROUP_ID; return NULL; } @@ -389,7 +389,7 @@ void** qRegisterTask(void* pMgmt, uint64_t qId, void *qInfo) { pthread_mutex_lock(&pQueryMgmt->lock); if (pQueryMgmt->closed) { pthread_mutex_unlock(&pQueryMgmt->lock); - qError("QInfo:0x%"PRIx64"-%p failed to add qhandle into cache, since qMgmt is colsing", qId, (void*)qInfo); + qError("QID:0x%"PRIx64"-%p failed to add qhandle into cache, since qMgmt is colsing", qId, (void*)qInfo); terrno = TSDB_CODE_VND_INVALID_VGROUP_ID; return NULL; } else { @@ -445,7 +445,7 @@ int32_t qKillQueryByQId(void* pMgmt, int64_t qId, int32_t waitMs, int32_t waitCo return TSDB_CODE_QRY_INVALID_QHANDLE; } qWarn("QId:0x%"PRIx64" be killed(no memory commit).", pQInfo->qId); - setQueryKilled(pQInfo); + setTaskKilled(pQInfo); // wait query stop int32_t loop = 0; diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index bfd1bce499..9a2bb75b34 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -2432,9 +2432,9 @@ static bool needBuildResAfterQueryComplete(SQInfo* pQInfo) { } bool isTaskKilled(SExecTaskInfo *pTaskInfo) { - if (IS_QUERY_KILLED(pTaskInfo)) { - return true; - } +// if (IS_QUERY_KILLED(pTaskInfo)) { +// return true; +// } // query has been executed more than tsShellActivityTimer, and the retrieve has not arrived // abort current query execution. @@ -2444,13 +2444,13 @@ bool isTaskKilled(SExecTaskInfo *pTaskInfo) { assert(pTaskInfo->cost.start != 0); // qDebug("QInfo:%" PRIu64 " retrieve not arrive beyond %d ms, abort current query execution, start:%" PRId64 // ", current:%d", pQInfo->qId, 1, pQInfo->startExecTs, taosGetTimestampSec()); - return true; +// return true; } return false; } -void setQueryKilled(SQInfo *pQInfo) { pQInfo->code = TSDB_CODE_TSC_QUERY_CANCELLED;} +void setTaskKilled(SExecTaskInfo *pTaskInfo) { pTaskInfo->code = TSDB_CODE_TSC_QUERY_CANCELLED;} //static bool isFixedOutputQuery(STaskAttr* pQueryAttr) { // if (QUERY_IS_INTERVAL_QUERY(pQueryAttr)) { @@ -4420,33 +4420,32 @@ void calculateOperatorProfResults(SQInfo* pQInfo) { taosArrayDestroy(opStack); } -void queryCostStatis(SQInfo *pQInfo) { - STaskRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; - STaskCostInfo *pSummary = &pQInfo->summary; +void queryCostStatis(SExecTaskInfo *pTaskInfo) { + STaskCostInfo *pSummary = &pTaskInfo->cost; - uint64_t hashSize = taosHashGetMemSize(pQInfo->runtimeEnv.pResultRowHashTable); - hashSize += taosHashGetMemSize(pRuntimeEnv->tableqinfoGroupInfo.map); - pSummary->hashSize = hashSize; +// uint64_t hashSize = taosHashGetMemSize(pQInfo->runtimeEnv.pResultRowHashTable); +// hashSize += taosHashGetMemSize(pRuntimeEnv->tableqinfoGroupInfo.map); +// pSummary->hashSize = hashSize; // add the merge time pSummary->elapsedTime += pSummary->firstStageMergeTime; - SResultRowPool* p = pQInfo->runtimeEnv.pool; - if (p != NULL) { - pSummary->winInfoSize = getResultRowPoolMemSize(p); - pSummary->numOfTimeWindows = getNumOfAllocatedResultRows(p); - } else { - pSummary->winInfoSize = 0; - pSummary->numOfTimeWindows = 0; - } - - calculateOperatorProfResults(pQInfo); - - //qDebug("QInfo:0x%"PRIx64" :cost summary: elapsed time:%"PRId64" us, first merge:%"PRId64" us, total blocks:%d, " -// "load block statis:%d, load data block:%d, total rows:%"PRId64 ", check rows:%"PRId64, -// pQInfo->qId, pSummary->elapsedTime, pSummary->firstStageMergeTime, pSummary->totalBlocks, pSummary->loadBlockStatis, -// pSummary->loadBlocks, pSummary->totalRows, pSummary->totalCheckedRows); +// SResultRowPool* p = pTaskInfo->pool; +// if (p != NULL) { +// pSummary->winInfoSize = getResultRowPoolMemSize(p); +// pSummary->numOfTimeWindows = getNumOfAllocatedResultRows(p); +// } else { +// pSummary->winInfoSize = 0; +// pSummary->numOfTimeWindows = 0; +// } +// +// calculateOperatorProfResults(pQInfo); + qDebug("QID:0x%"PRIx64" :cost summary: elapsed time:%"PRId64" us, first merge:%"PRId64" us, total blocks:%d, " + "load block statis:%d, load data block:%d, total rows:%"PRId64 ", check rows:%"PRId64, + pTaskInfo->id.queryId, pSummary->elapsedTime, pSummary->firstStageMergeTime, pSummary->totalBlocks, pSummary->loadBlockStatis, + pSummary->loadBlocks, pSummary->totalRows, pSummary->totalCheckedRows); +// //qDebug("QInfo:0x%"PRIx64" :cost summary: winResPool size:%.2f Kb, numOfWin:%"PRId64", tableInfoSize:%.2f Kb, hashTable:%.2f Kb", pQInfo->qId, pSummary->winInfoSize/1024.0, // pSummary->numOfTimeWindows, pSummary->tableInfoSize/1024.0, pSummary->hashSize/1024.0); @@ -7733,7 +7732,6 @@ static SExecTaskInfo* createExecTaskInfo(uint64_t queryId) { SExecTaskInfo* pTaskInfo = calloc(1, sizeof(SExecTaskInfo)); setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED); - pthread_mutex_init(&pTaskInfo->lock, NULL); pTaskInfo->cost.created = taosGetTimestampMs(); pTaskInfo->id.queryId = queryId; return pTaskInfo; @@ -8673,229 +8671,6 @@ FORCE_INLINE bool checkQIdEqual(void *qHandle, uint64_t qId) { return ((SQInfo *)qHandle)->qId == qId; } -SQInfo* createQInfoImpl(SQueryTableReq* pQueryMsg, SGroupbyExpr* pGroupbyExpr, SExprInfo* pExprs, - SExprInfo* pSecExprs, STableGroupInfo* pTableGroupInfo, SColumnInfo* pTagCols, SFilterInfo* pFilters, int32_t vgId, - char* sql, uint64_t qId, struct SUdfInfo* pUdfInfo) { - int16_t numOfCols = pQueryMsg->numOfCols; - int16_t numOfOutput = pQueryMsg->numOfOutput; - - SQInfo *pQInfo = (SQInfo *)calloc(1, sizeof(SQInfo)); - if (pQInfo == NULL) { - goto _cleanup_qinfo; - } - - pQInfo->qId = qId; - pQInfo->startExecTs = 0; - - pQInfo->runtimeEnv.pUdfInfo = pUdfInfo; - - // to make sure third party won't overwrite this structure - pQInfo->signature = pQInfo; - STaskAttr* pQueryAttr = &pQInfo->query; - pQInfo->runtimeEnv.pQueryAttr = pQueryAttr; - - pQueryAttr->tableGroupInfo = *pTableGroupInfo; - pQueryAttr->numOfCols = numOfCols; - pQueryAttr->numOfOutput = numOfOutput; - pQueryAttr->limit.limit = pQueryMsg->limit; - pQueryAttr->limit.offset = pQueryMsg->offset; - pQueryAttr->order.order = pQueryMsg->order; - pQueryAttr->order.col.info.colId = pQueryMsg->orderColId; - pQueryAttr->pExpr1 = pExprs; - pQueryAttr->pExpr2 = pSecExprs; - pQueryAttr->numOfExpr2 = pQueryMsg->secondStageOutput; - pQueryAttr->pGroupbyExpr = pGroupbyExpr; - memcpy(&pQueryAttr->interval, &pQueryMsg->interval, sizeof(pQueryAttr->interval)); - pQueryAttr->fillType = pQueryMsg->fillType; - pQueryAttr->numOfTags = pQueryMsg->numOfTags; - pQueryAttr->tagColList = pTagCols; - pQueryAttr->prjInfo.vgroupLimit = pQueryMsg->vgroupLimit; - pQueryAttr->prjInfo.ts = (pQueryMsg->order == TSDB_ORDER_ASC)? INT64_MIN:INT64_MAX; -// pQueryAttr->sw = pQueryMsg->sw; - pQueryAttr->vgId = vgId; - - pQueryAttr->stableQuery = pQueryMsg->stableQuery; - pQueryAttr->topBotQuery = pQueryMsg->topBotQuery; - pQueryAttr->groupbyColumn = pQueryMsg->groupbyColumn; - pQueryAttr->hasTagResults = pQueryMsg->hasTagResults; - pQueryAttr->timeWindowInterpo = pQueryMsg->timeWindowInterpo; - pQueryAttr->queryBlockDist = pQueryMsg->queryBlockDist; - pQueryAttr->stabledev = pQueryMsg->stabledev; - pQueryAttr->tsCompQuery = pQueryMsg->tsCompQuery; - pQueryAttr->simpleAgg = pQueryMsg->simpleAgg; - pQueryAttr->pointInterpQuery = pQueryMsg->pointInterpQuery; - pQueryAttr->needReverseScan = pQueryMsg->needReverseScan; - pQueryAttr->stateWindow = pQueryMsg->stateWindow; - pQueryAttr->vgId = vgId; -// pQueryAttr->pFilters = pFilters; - - pQueryAttr->tableCols = calloc(numOfCols, sizeof(SSingleColumnFilterInfo)); - if (pQueryAttr->tableCols == NULL) { - goto _cleanup; - } - - pQueryAttr->srcRowSize = 0; - pQueryAttr->maxTableColumnWidth = 0; - for (int16_t i = 0; i < numOfCols; ++i) { - pQueryAttr->tableCols[i] = pQueryMsg->tableCols[i]; -// pQueryAttr->tableCols[i].flist.filterInfo = tFilterInfoDup(pQueryMsg->tableCols[i].flist.filterInfo, pQueryAttr->tableCols[i].flist.numOfFilters); - - pQueryAttr->srcRowSize += pQueryAttr->tableCols[i].bytes; - if (pQueryAttr->maxTableColumnWidth < pQueryAttr->tableCols[i].bytes) { - pQueryAttr->maxTableColumnWidth = pQueryAttr->tableCols[i].bytes; - } - } - - for (int16_t col = 0; col < numOfOutput; ++col) { - assert(pExprs[col].base.resSchema.bytes > 0); - pQueryAttr->resultRowSize += pExprs[col].base.resSchema.bytes; - - // keep the tag length - if (TSDB_COL_IS_TAG(pExprs[col].base.pColumns->flag)) { - pQueryAttr->tagLen += pExprs[col].base.resSchema.bytes; - } - -// if (pExprs[col].base.flist.filterInfo) { -// ++pQueryAttr->havingNum; -// } - } - - doUpdateExprColumnIndex(pQueryAttr); - - if (pSecExprs != NULL) { - int32_t resultRowSize = 0; - - // calculate the result row size - for (int16_t col = 0; col < pQueryAttr->numOfExpr2; ++col) { - assert(pSecExprs[col].base.resSchema.bytes > 0); - resultRowSize += pSecExprs[col].base.resSchema.bytes; - } - - if (resultRowSize > pQueryAttr->resultRowSize) { - pQueryAttr->resultRowSize = resultRowSize; - } - } - - if (pQueryAttr->fillType != TSDB_FILL_NONE) { - pQueryAttr->fillVal = malloc(sizeof(int64_t) * pQueryAttr->numOfOutput); - if (pQueryAttr->fillVal == NULL) { - goto _cleanup; - } - - // the first column is the timestamp - memcpy(pQueryAttr->fillVal, (char *)pQueryMsg->fillVal, pQueryAttr->numOfOutput * sizeof(int64_t)); - } - - size_t numOfGroups = 0; - if (pTableGroupInfo->pGroupList != NULL) { - numOfGroups = taosArrayGetSize(pTableGroupInfo->pGroupList); - STableGroupInfo* pTableqinfo = &pQInfo->runtimeEnv.tableqinfoGroupInfo; - - pTableqinfo->pGroupList = taosArrayInit(numOfGroups, POINTER_BYTES); - pTableqinfo->numOfTables = pTableGroupInfo->numOfTables; - pTableqinfo->map = taosHashInit(pTableGroupInfo->numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); - } - - pQInfo->pBuf = calloc(pTableGroupInfo->numOfTables, sizeof(STableQueryInfo)); - if (pQInfo->pBuf == NULL) { - goto _cleanup; - } - - pQInfo->dataReady = QUERY_RESULT_NOT_READY; - pQInfo->rspContext = NULL; - pQInfo->sql = sql; - pthread_mutex_init(&pQInfo->lock, NULL); - tsem_init(&pQInfo->ready, 0, 0); - - pQueryAttr->window = pQueryMsg->window; - updateDataCheckOrder(pQInfo, pQueryMsg, pQueryAttr->stableQuery); - - STaskRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv; - STimeWindow window = pQueryAttr->window; - - int32_t index = 0; - for(int32_t i = 0; i < numOfGroups; ++i) { - SArray* pa = taosArrayGetP(pQueryAttr->tableGroupInfo.pGroupList, i); - - size_t s = taosArrayGetSize(pa); - SArray* p1 = taosArrayInit(s, POINTER_BYTES); - if (p1 == NULL) { - goto _cleanup; - } - - taosArrayPush(pRuntimeEnv->tableqinfoGroupInfo.pGroupList, &p1); - - for(int32_t j = 0; j < s; ++j) { -// STableKeyInfo* info = taosArrayGet(pa, j); -// window.skey = info->lastKey; -// -// void* buf = (char*) pQInfo->pBuf + index * sizeof(STableQueryInfo); -// STableQueryInfo* item = createTableQueryInfo(pQueryAttr, info->pTable, pQueryAttr->groupbyColumn, window, buf); -// if (item == NULL) { -// goto _cleanup; -// } -// -// item->groupIndex = i; -// taosArrayPush(p1, &item); - -// STableId* id = TSDB_TABLEID(info->pTable); -// taosHashPut(pRuntimeEnv->tableqinfoGroupInfo.map, &id->tid, sizeof(id->tid), &item, POINTER_BYTES); -// index += 1; - } - } - - colIdCheck(pQueryAttr, pQInfo->qId); - -// int32_t functionId = getExprFunctionId(&pExpr[0]); -// pQInfo->query.queryBlockDist = (functionId == FUNCTION_BLKINFO); - - //qDebug("qmsg:%p vgId:%d, QInfo:0x%" PRIx64 "-%p created", pQueryMsg, pQInfo->query.vgId, pQInfo->qId, pQInfo); - return pQInfo; - -_cleanup_qinfo: -// tsdbDestroyTableGroup(pTableGroupInfo); - - if (pGroupbyExpr != NULL) { - taosArrayDestroy(pGroupbyExpr->columnInfo); - free(pGroupbyExpr); - } - - tfree(pTagCols); - for (int32_t i = 0; i < numOfOutput; ++i) { - SExprInfo* pExprInfo = &pExprs[i]; - if (pExprInfo->pExpr != NULL) { - tExprTreeDestroy(pExprInfo->pExpr, NULL); - pExprInfo->pExpr = NULL; - } - -// if (pExprInfo->base.flist.filterInfo) { -// freeColumnFilterInfo(pExprInfo->base.flist.filterInfo, pExprInfo->base.flist.numOfFilters); -// } - } - - tfree(pExprs); - -// filterFreeInfo(pFilters); - -_cleanup: - doDestroyTask(pQInfo); - return NULL; -} - -bool isValidQInfo(void *param) { - SQInfo *pQInfo = (SQInfo *)param; - if (pQInfo == NULL) { - return false; - } - - /* - * pQInfo->signature may be changed by another thread, so we assign value of signature - * into local variable, then compare by using local variable - */ - uint64_t sig = (uint64_t)pQInfo->signature; - return (sig == (uint64_t)pQInfo); -} - int32_t initQInfo(STsBufInfo* pTsBufInfo, void* tsdb, void* sourceOptr, SQInfo* pQInfo, STaskParam* param, char* start, int32_t prevResultLen, void* merger) { int32_t code = TSDB_CODE_SUCCESS; @@ -8957,7 +8732,7 @@ int32_t initQInfo(STsBufInfo* pTsBufInfo, void* tsdb, void* sourceOptr, SQInfo* _error: // table query ref will be decrease during error handling - doDestroyTask(pQInfo); +// doDestroyTask(pQInfo); return code; } @@ -9038,36 +8813,14 @@ void* freeColumnInfo(SColumnInfo* pColumnInfo, int32_t numOfCols) { return NULL; } -void doDestroyTask(SQInfo *pQInfo) { - if (!isValidQInfo(pQInfo)) { - return; - } +void doDestroyTask(SExecTaskInfo *pTaskInfo) { + qDebug("QID:0x%"PRIx64" start to free execTask", pTaskInfo->id.queryId); + doDestroyTableQueryInfo(&pTaskInfo->tableqinfoGroupInfo); +// taosArrayDestroy(pTaskInfo->summary.queryProfEvents); +// taosHashCleanup(pTaskInfo->summary.operatorProfResults); - //qDebug("QInfo:0x%"PRIx64" start to free QInfo", pQInfo->qId); - - STaskRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv; - releaseQueryBuf(pRuntimeEnv->tableqinfoGroupInfo.numOfTables); - - doDestroyTableQueryInfo(&pRuntimeEnv->tableqinfoGroupInfo); - teardownQueryRuntimeEnv(&pQInfo->runtimeEnv); - - STaskAttr *pQueryAttr = pQInfo->runtimeEnv.pQueryAttr; - freeQueryAttr(pQueryAttr); - -// tsdbDestroyTableGroup(&pQueryAttr->tableGroupInfo); - - tfree(pQInfo->pBuf); - tfree(pQInfo->sql); - - taosArrayDestroy(pQInfo->summary.queryProfEvents); - taosHashCleanup(pQInfo->summary.operatorProfResults); - - taosArrayDestroy(pRuntimeEnv->groupResInfo.pRows); - pQInfo->signature = 0; - - //qDebug("QInfo:0x%"PRIx64" QInfo is freed", pQInfo->qId); - - tfree(pQInfo); + qDebug("QID:0x%"PRIx64" execTask is freed", pTaskInfo->id.queryId); + tfree(pTaskInfo); } int32_t doDumpQueryResult(SQInfo *pQInfo, char *data, int8_t compressed, int32_t *compLen) { diff --git a/source/libs/parser/inc/dataBlockMgt.h b/source/libs/parser/inc/dataBlockMgt.h index cd84222c65..f53c5ea279 100644 --- a/source/libs/parser/inc/dataBlockMgt.h +++ b/source/libs/parser/inc/dataBlockMgt.h @@ -171,6 +171,8 @@ int32_t boundIdxCompar(const void *lhs, const void *rhs); void setBoundColumnInfo(SParsedDataColInfo* pColList, SSchema* pSchema, int32_t numOfCols); void destroyBoundColumnInfo(SParsedDataColInfo* pColList); void destroyBlockArrayList(SArray* pDataBlockList); +void destroyBlockHashmap(SHashObj* pDataBlockHash); + int32_t initMemRowBuilder(SMemRowBuilder *pBuilder, uint32_t nRows, uint32_t nCols, uint32_t nBoundCols, int32_t allNullLen); int32_t allocateMemIfNeed(STableDataBlocks *pDataBlock, int32_t rowSize, int32_t * numOfRows); int32_t getDataBlockFromList(SHashObj* pHashList, int64_t id, int32_t size, int32_t startOffset, int32_t rowSize, diff --git a/source/libs/parser/src/astValidate.c b/source/libs/parser/src/astValidate.c index 563443d3c3..0a2f2b20f2 100644 --- a/source/libs/parser/src/astValidate.c +++ b/source/libs/parser/src/astValidate.c @@ -213,10 +213,11 @@ SQueryStmtInfo *createQueryInfo() { pQueryInfo->slimit.limit = -1; pQueryInfo->slimit.offset = 0; - pQueryInfo->pDownstream = taosArrayInit(4, POINTER_BYTES); + pQueryInfo->pDownstream = taosArrayInit(4, POINTER_BYTES); pQueryInfo->window = TSWINDOW_INITIALIZER; pQueryInfo->exprList = calloc(10, POINTER_BYTES); + for(int32_t i = 0; i < 10; ++i) { pQueryInfo->exprList[i] = taosArrayInit(4, POINTER_BYTES); } @@ -232,7 +233,8 @@ static void destroyQueryInfoImpl(SQueryStmtInfo* pQueryInfo) { cleanupFieldInfo(&pQueryInfo->fieldsInfo); dropAllExprInfo(pQueryInfo->exprList, 10); - pQueryInfo->exprList = NULL; + + tfree(pQueryInfo->exprList); columnListDestroy(pQueryInfo->colList); pQueryInfo->colList = NULL; @@ -258,10 +260,10 @@ void destroyQueryInfo(SQueryStmtInfo* pQueryInfo) { size_t numOfUpstream = taosArrayGetSize(pQueryInfo->pDownstream); for (int32_t i = 0; i < numOfUpstream; ++i) { - SQueryStmtInfo* pUpQueryInfo = taosArrayGetP(pQueryInfo->pDownstream, i); - destroyQueryInfoImpl(pUpQueryInfo); - clearAllTableMetaInfo(pUpQueryInfo, false, 0); - tfree(pUpQueryInfo); + SQueryStmtInfo* pDownstream = taosArrayGetP(pQueryInfo->pDownstream, i); + destroyQueryInfoImpl(pDownstream); + clearAllTableMetaInfo(pDownstream, false, 0); + tfree(pDownstream); } destroyQueryInfoImpl(pQueryInfo); @@ -1395,6 +1397,13 @@ int32_t validateFillNode(SQueryStmtInfo *pQueryInfo, SSqlNode* pSqlNode, SMsgBuf static void pushDownAggFuncExprInfo(SQueryStmtInfo* pQueryInfo); static void addColumnNodeFromLowerLevel(SQueryStmtInfo* pQueryInfo); +static void freeItemHelper(void* pItem) { + void** p = pItem; + if (*p != NULL) { + tfree(*p); + } +} + int32_t validateSqlNode(SSqlNode* pSqlNode, SQueryStmtInfo* pQueryInfo, SMsgBuf* pMsgBuf) { assert(pSqlNode != NULL && (pSqlNode->from == NULL || taosArrayGetSize(pSqlNode->from->list) > 0)); @@ -1590,7 +1599,10 @@ int32_t validateSqlNode(SSqlNode* pSqlNode, SQueryStmtInfo* pQueryInfo, SMsgBuf* SArray* functionList = extractFunctionList(pQueryInfo->exprList[i]); extractFunctionDesc(functionList, &pQueryInfo->info); - if ((code = checkForInvalidExpr(pQueryInfo, pMsgBuf)) != TSDB_CODE_SUCCESS) { + code = checkForInvalidExpr(pQueryInfo, pMsgBuf); + taosArrayDestroyEx(functionList, freeItemHelper); + + if (code != TSDB_CODE_SUCCESS) { return code; } } @@ -2902,6 +2914,8 @@ int32_t doAddOneProjectCol(SQueryStmtInfo* pQueryInfo, int32_t outputColIndex, S } pQueryInfo->info.projectionQuery = true; + + taosArrayDestroy(pColumnList); return TSDB_CODE_SUCCESS; } @@ -3983,5 +3997,9 @@ int32_t qParserValidateSqlNode(SParseContext *pCtx, SSqlInfo* pInfo, SQueryStmtI validateSqlNode(p, pQueryInfo, &buf); } + taosArrayDestroy(data.pTableMeta); + taosArrayDestroy(req.pUdf); + taosArrayDestroy(req.pTableName); + return code; } diff --git a/source/libs/parser/src/dataBlockMgt.c b/source/libs/parser/src/dataBlockMgt.c index bc4eae7ae9..ece3d5f5eb 100644 --- a/source/libs/parser/src/dataBlockMgt.c +++ b/source/libs/parser/src/dataBlockMgt.c @@ -249,7 +249,7 @@ static FORCE_INLINE void convertSMemRow(SMemRow dest, SMemRow src, STableDataBlo } } -void destroyDataBlock(STableDataBlocks* pDataBlock) { +static void destroyDataBlock(STableDataBlocks* pDataBlock) { if (pDataBlock == NULL) { return; } @@ -273,12 +273,29 @@ void destroyBlockArrayList(SArray* pDataBlockList) { size_t size = taosArrayGetSize(pDataBlockList); for (int32_t i = 0; i < size; i++) { - destroyDataBlock(taosArrayGetP(pDataBlockList, i)); + void* p = taosArrayGetP(pDataBlockList, i); + destroyDataBlock(p); } taosArrayDestroy(pDataBlockList); } +void destroyBlockHashmap(SHashObj* pDataBlockHash) { + if (pDataBlockHash == NULL) { + return; + } + + void** p1 = taosHashIterate(pDataBlockHash, NULL); + while (p1) { + STableDataBlocks* pBlocks = *p1; + destroyDataBlock(pBlocks); + + p1 = taosHashIterate(pDataBlockHash, p1); + } + + taosHashCleanup(pDataBlockHash); +} + // data block is disordered, sort it in ascending order void sortRemoveDataBlockDupRowsRaw(STableDataBlocks *dataBuf) { SSubmitBlk *pBlocks = (SSubmitBlk *)dataBuf->pData; @@ -490,7 +507,7 @@ int32_t mergeTableDataBlocks(SHashObj* pHashObj, int8_t schemaAttached, uint8_t } // the maximum expanded size in byte when a row-wise data is converted to SDataRow format - int32_t expandSize = isRawPayload ? getRowExpandSize(pOneTableBlock->pTableMeta) : 0; + int32_t expandSize = isRawPayload ? getRowExpandSize(pOneTableBlock->pTableMeta) : 0; int64_t destSize = dataBuf->size + pOneTableBlock->size + pBlocks->numOfRows * expandSize + sizeof(STColumn) * getNumOfColumns(pOneTableBlock->pTableMeta); diff --git a/source/libs/parser/src/insertParser.c b/source/libs/parser/src/insertParser.c index a031199992..cb7df824b0 100644 --- a/source/libs/parser/src/insertParser.c +++ b/source/libs/parser/src/insertParser.c @@ -525,10 +525,28 @@ static void destroyInsertParseContextForTable(SInsertParseContext* pCxt) { tdDestroyKVRowBuilder(&pCxt->tagsBuilder); } +static void destroyDataBlock(STableDataBlocks* pDataBlock) { + if (pDataBlock == NULL) { + return; + } + + tfree(pDataBlock->pData); + if (!pDataBlock->cloned) { + // free the refcount for metermeta + if (pDataBlock->pTableMeta != NULL) { + tfree(pDataBlock->pTableMeta); + } + + destroyBoundColumnInfo(&pDataBlock->boundColumnInfo); + } + tfree(pDataBlock); +} + static void destroyInsertParseContext(SInsertParseContext* pCxt) { destroyInsertParseContextForTable(pCxt); taosHashCleanup(pCxt->pVgroupsHashObj); - taosHashCleanup(pCxt->pTableBlockHashObj); + + destroyBlockHashmap(pCxt->pTableBlockHashObj); destroyBlockArrayList(pCxt->pTableDataBlocks); destroyBlockArrayList(pCxt->pVgDataBlocks); } diff --git a/source/libs/parser/src/parser.c b/source/libs/parser/src/parser.c index 4271aae451..c98f787f0b 100644 --- a/source/libs/parser/src/parser.c +++ b/source/libs/parser/src/parser.c @@ -248,10 +248,15 @@ void qDestroyQuery(SQueryNode* pQueryNode) { if (NULL == pQueryNode) { return; } - if (nodeType(pQueryNode) == TSDB_SQL_INSERT || nodeType(pQueryNode) == TSDB_SQL_CREATE_TABLE) { + + int32_t type = nodeType(pQueryNode); + if (type == TSDB_SQL_INSERT || type == TSDB_SQL_CREATE_TABLE) { SVnodeModifOpStmtInfo* pModifInfo = (SVnodeModifOpStmtInfo*)pQueryNode; taosArrayDestroy(pModifInfo->pDataBlocks); - } - tfree(pQueryNode); + tfree(pQueryNode); + } else if (type == TSDB_SQL_SELECT) { + SQueryStmtInfo* pQueryStmtInfo = (SQueryStmtInfo*) pQueryNode; + destroyQueryInfo(pQueryStmtInfo); + } } diff --git a/source/libs/parser/src/parserUtil.c b/source/libs/parser/src/parserUtil.c index b8545b7486..a87e138ed0 100644 --- a/source/libs/parser/src/parserUtil.c +++ b/source/libs/parser/src/parserUtil.c @@ -732,18 +732,8 @@ void cleanupFieldInfo(SFieldInfo* pFieldInfo) { return; } - if (pFieldInfo->internalField != NULL) { - size_t num = taosArrayGetSize(pFieldInfo->internalField); - for (int32_t i = 0; i < num; ++i) { -// SInternalField* pfield = taosArrayGet(pFieldInfo->internalField, i); -// if (pfield->pExpr != NULL && pfield->pExpr->pExpr != NULL) { -// sqlExprDestroy(pfield->pExpr); -// } - } - } - taosArrayDestroy(pFieldInfo->internalField); -// tfree(pFieldInfo->final); + tfree(pFieldInfo->final); memset(pFieldInfo, 0, sizeof(SFieldInfo)); } diff --git a/source/libs/parser/src/queryInfoUtil.c b/source/libs/parser/src/queryInfoUtil.c index 1aa8836cb2..9a2ca2da98 100644 --- a/source/libs/parser/src/queryInfoUtil.c +++ b/source/libs/parser/src/queryInfoUtil.c @@ -191,10 +191,12 @@ void destroyExprInfo(SExprInfo* pExprInfo) { for(int32_t i = 0; i < pExprInfo->base.numOfParams; ++i) { taosVariantDestroy(&pExprInfo->base.param[i]); } + + tfree(pExprInfo->base.pColumns); tfree(pExprInfo); } -static void dropOneLevelExprInfo(SArray* pExprInfo) { +void dropOneLevelExprInfo(SArray* pExprInfo) { size_t size = taosArrayGetSize(pExprInfo); for (int32_t i = 0; i < size; ++i) { @@ -239,6 +241,9 @@ void assignExprInfo(SExprInfo* dst, const SExprInfo* src) { #endif dst->pExpr = exprdup(src->pExpr); + dst->base.pColumns = calloc(src->base.numOfCols, sizeof(SColumn)); + memcpy(dst->base.pColumns, src->base.pColumns, sizeof(SColumn) * src->base.numOfCols); + memset(dst->base.param, 0, sizeof(SVariant) * tListLen(dst->base.param)); for (int32_t j = 0; j < src->base.numOfParams; ++j) { taosVariantAssign(&dst->base.param[j], &src->base.param[j]); diff --git a/source/libs/planner/src/logicPlan.c b/source/libs/planner/src/logicPlan.c index e817b764d5..de62f4a2ef 100644 --- a/source/libs/planner/src/logicPlan.c +++ b/source/libs/planner/src/logicPlan.c @@ -265,7 +265,6 @@ static SQueryPlanNode* doCreateQueryPlanForSingleTableImpl(const SQueryStmtInfo* } else { // here we can push down the projection to tablescan operator. pNode->numOfExpr = num; - pNode->pExpr = taosArrayInit(num, POINTER_BYTES); taosArrayAddAll(pNode->pExpr, p); } } @@ -357,7 +356,6 @@ SArray* createQueryPlanImpl(const SQueryStmtInfo* pQueryInfo) { SArray* exprList = taosArrayInit(4, POINTER_BYTES); if (copyExprInfoList(exprList, pQueryInfo->exprList[0], uid, true) != 0) { terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; -// dropAllExprInfo(exprList); exit(-1); } @@ -373,7 +371,6 @@ SArray* createQueryPlanImpl(const SQueryStmtInfo* pQueryInfo) { // 4. add the projection query node SQueryPlanNode* pNode = doAddTableColumnNode(pQueryInfo, &info, exprList, tableColumnList); columnListDestroy(tableColumnList); -// dropAllExprInfo(exprList); taosArrayPush(pDownstream, &pNode); } @@ -398,7 +395,8 @@ SArray* createQueryPlanImpl(const SQueryStmtInfo* pQueryInfo) { } static void doDestroyQueryNode(SQueryPlanNode* pQueryNode) { - if (pQueryNode->info.type == QNODE_MODIFY) { + int32_t type = nodeType(pQueryNode); + if (type == QNODE_MODIFY) { SDataPayloadInfo* pInfo = pQueryNode->pExtInfo; size_t size = taosArrayGetSize(pInfo->payload); @@ -410,10 +408,16 @@ static void doDestroyQueryNode(SQueryPlanNode* pQueryNode) { taosArrayDestroy(pInfo->payload); } + if (type == QNODE_STREAMSCAN || type == QNODE_TABLESCAN) { + SQueryTableInfo* pQueryTableInfo = pQueryNode->pExtInfo; + tfree(pQueryTableInfo->tableName); + } + + taosArrayDestroy(pQueryNode->pExpr); + tfree(pQueryNode->pExtInfo); tfree(pQueryNode->pSchema); tfree(pQueryNode->info.name); -// dropAllExprInfo(pQueryNode->pExpr); if (pQueryNode->pChildren != NULL) { int32_t size = (int32_t) taosArrayGetSize(pQueryNode->pChildren); diff --git a/source/libs/planner/src/physicalPlan.c b/source/libs/planner/src/physicalPlan.c index 9288c240d0..12c0d0780b 100644 --- a/source/libs/planner/src/physicalPlan.c +++ b/source/libs/planner/src/physicalPlan.c @@ -155,6 +155,16 @@ static SPhyNode* initPhyNode(SQueryPlanNode* pPlanNode, int32_t type, int32_t si return node; } +static void cleanupPhyNode(SPhyNode* pPhyNode) { + if (pPhyNode == NULL) { + return; + } + + dropOneLevelExprInfo(pPhyNode->pTargets); + tfree(pPhyNode->targetSchema.pSchema); + tfree(pPhyNode); +} + static SPhyNode* initScanNode(SQueryPlanNode* pPlanNode, SQueryTableInfo* pTable, int32_t type, int32_t size) { SScanPhyNode* node = (SScanPhyNode*) initPhyNode(pPlanNode, type, size); @@ -445,3 +455,29 @@ void setExchangSourceNode(uint64_t templateId, SDownstreamSource *pSource, SPhyN void setSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SDownstreamSource* pSource) { setExchangSourceNode(templateId, pSource, subplan->pNode); } + +static void destroyDataSinkNode(SDataSink* pSinkNode) { + if (pSinkNode == NULL) { + return; + } + + if (nodeType(pSinkNode) == DSINK_Dispatch) { + SDataDispatcher* pDdSink = (SDataDispatcher*)pSinkNode; + tfree(pDdSink->sink.schema.pSchema); + } + + tfree(pSinkNode); +} + +void qDestroySubplan(SSubplan* pSubplan) { + if (pSubplan == NULL) { + return; + } + + taosArrayDestroy(pSubplan->pChildren); + taosArrayDestroy(pSubplan->pParents); + destroyDataSinkNode(pSubplan->pDataSink); + cleanupPhyNode(pSubplan->pNode); + + tfree(pSubplan); +} diff --git a/source/libs/planner/src/physicalPlanJson.c b/source/libs/planner/src/physicalPlanJson.c index cf54fdec85..4fbec534f6 100644 --- a/source/libs/planner/src/physicalPlanJson.c +++ b/source/libs/planner/src/physicalPlanJson.c @@ -87,6 +87,7 @@ static bool fromObjectWithAlloc(const cJSON* json, const char* name, FFromJson f static const char* jkPnodeType = "Type"; static int32_t getPnodeTypeSize(cJSON* json) { switch (getNumber(json, jkPnodeType)) { + case OP_StreamScan: case OP_TableScan: case OP_DataBlocksOptScan: case OP_TableSeqScan: @@ -869,6 +870,7 @@ static bool specificPhyNodeFromJson(const cJSON* json, void* obj) { SPhyNode* phyNode = (SPhyNode*)obj; switch (phyNode->info.type) { case OP_TableScan: + case OP_StreamScan: case OP_DataBlocksOptScan: case OP_TableSeqScan: return tableScanNodeFromJson(json, obj); @@ -1121,6 +1123,8 @@ int32_t subPlanToString(const SSubplan* subplan, char** str, int32_t* len) { } *str = cJSON_Print(json); + cJSON_Delete(json); + // printf("====Physical plan:====\n"); // printf("%s\n", *str); *len = strlen(*str) + 1; @@ -1187,14 +1191,14 @@ SQueryDag* qJsonToDag(const cJSON* pRoot) { if(pDag == NULL) { return NULL; } - pDag->numOfSubplans = cJSON_GetNumberValue(cJSON_GetObjectItem(pRoot, "numOfSubplans")); - pDag->queryId = cJSON_GetNumberValue(cJSON_GetObjectItem(pRoot, "queryId")); + pDag->numOfSubplans = cJSON_GetNumberValue(cJSON_GetObjectItem(pRoot, "Number")); + pDag->queryId = cJSON_GetNumberValue(cJSON_GetObjectItem(pRoot, "QueryId")); pDag->pSubplans = taosArrayInit(0, sizeof(SArray)); if (pDag->pSubplans == NULL) { free(pDag); return NULL; } - cJSON* pLevels = cJSON_GetObjectItem(pRoot, "pSubplans"); + cJSON* pLevels = cJSON_GetObjectItem(pRoot, "Subplans"); int level = cJSON_GetArraySize(pLevels); for(int i = 0; i < level; i++) { SArray* plansOneLevel = taosArrayInit(0, sizeof(void*)); diff --git a/source/libs/planner/src/planner.c b/source/libs/planner/src/planner.c index 2fc4c8ea3e..d546925c5f 100644 --- a/source/libs/planner/src/planner.c +++ b/source/libs/planner/src/planner.c @@ -18,25 +18,6 @@ static void extractResSchema(struct SQueryDag* const* pDag, SSchema** pResSchema, int32_t* numOfCols); -static void destroyDataSinkNode(SDataSink* pSinkNode) { - if (pSinkNode == NULL) { - return; - } - tfree(pSinkNode); -} - -void qDestroySubplan(SSubplan* pSubplan) { - if (pSubplan == NULL) { - return; - } - - taosArrayDestroy(pSubplan->pChildren); - taosArrayDestroy(pSubplan->pParents); - destroyDataSinkNode(pSubplan->pDataSink); - // todo destroy pNode - tfree(pSubplan); -} - void qDestroyQueryDag(struct SQueryDag* pDag) { if (pDag == NULL) { return; @@ -51,6 +32,7 @@ void qDestroyQueryDag(struct SQueryDag* pDag) { SSubplan* pSubplan = taosArrayGetP(pa, j); qDestroySubplan(pSubplan); } + taosArrayDestroy(pa); } diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index b53fd9ff37..d09f749805 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -492,7 +492,7 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx) { SInputData inputData = {.pData = pRes, .pTableRetrieveTsMap = NULL}; code = dsPutDataBlock(sinkHandle, &inputData, &qcontinue); if (code) { - QW_TASK_ELOG("dsPutDataBlock failed, code:%x", code); + QW_TASK_ELOG("dsPutDataBlock failed, code:%s", tstrerror(code)); QW_ERR_JRET(code); } diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index f90fdd6e11..223fa300df 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -1490,7 +1490,7 @@ int32_t schedulerConvertDagToTaskList(SQueryDag* pDag, SArray **pTasks) { SSubQueryMsg *pMsg = (SSubQueryMsg*) msg; - pMsg->header.vgId = htonl(tInfo.addr.nodeId); + pMsg->header.vgId = tInfo.addr.nodeId; pMsg->sId = schMgmt.sId; pMsg->queryId = plan->id.queryId; @@ -1512,9 +1512,7 @@ int32_t schedulerConvertDagToTaskList(SQueryDag* pDag, SArray **pTasks) { info = NULL; _return: - schedulerFreeTaskList(info); - SCH_RET(code); } diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index c760acd52e..c7a23a2482 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -123,9 +123,9 @@ typedef struct { } SRpcReqContext; typedef struct { - SRpcInfo* pRpc; // associated SRpcInfo - SEpSet epSet; // ip list provided by app - void* ahandle; // handle provided by app + SRpcInfo* pTransInst; // associated SRpcInfo + SEpSet epSet; // ip list provided by app + void* ahandle; // handle provided by app // struct SRpcConn* pConn; // pConn allocated tmsg_t msgType; // message type uint8_t* pCont; // content provided by app @@ -182,7 +182,7 @@ typedef struct { #define TRANS_RESERVE_SIZE (sizeof(STranConnCtx)) -#define TRANS_MSG_OVERHEAD (sizeof(STransMsgHead) + sizeof(STransDigestMsg)) +#define TRANS_MSG_OVERHEAD (sizeof(STransMsgHead)) #define transHeadFromCont(cont) ((STransMsgHead*)((char*)cont - sizeof(STransMsgHead))) #define transContFromHead(msg) (msg + sizeof(STransMsgHead)) #define transMsgLenFromCont(contLen) (contLen + sizeof(STransMsgHead)) @@ -201,6 +201,7 @@ bool transDecompressMsg(char* msg, int32_t len, int32_t* flen); void transConnCtxDestroy(STransConnCtx* ctx); +void transFreeMsg(void* msg); typedef struct SConnBuffer { char* buf; int len; diff --git a/source/libs/transport/src/rpcMain.c b/source/libs/transport/src/rpcMain.c index 3c58a76a44..2d39a9cea5 100644 --- a/source/libs/transport/src/rpcMain.c +++ b/source/libs/transport/src/rpcMain.c @@ -242,11 +242,14 @@ void *rpcOpen(const SRpcInit *pInit) { pRpc = (SRpcInfo *)calloc(1, sizeof(SRpcInfo)); if (pRpc == NULL) return NULL; - if (pInit->label) tstrncpy(pRpc->label, pInit->label, strlen(pInit->label)); + if (pInit->label) tstrncpy(pRpc->label, pInit->label, tListLen(pInit->label)); pRpc->connType = pInit->connType; if (pRpc->connType == TAOS_CONN_CLIENT) { pRpc->numOfThreads = pInit->numOfThreads; + if (pRpc->numOfThreads >= 10) { + pRpc->numOfThreads = 10; + } } else { pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads; } @@ -769,8 +772,8 @@ static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, SRecvInfo *pRecv) { } taosHashPut(pRpc->hash, hashstr, size, (char *)&pConn, POINTER_BYTES); - tDebug("%s %p server connection is allocated, uid:0x%x sid:%d key:%s spi:%d", pRpc->label, pConn, pConn->linkUid, sid, - hashstr, pConn->spi); + tDebug("%s %p server connection is allocated, uid:0x%x sid:%d key:%s spi:%d", pRpc->label, pConn, pConn->linkUid, + sid, hashstr, pConn->spi); } return pConn; diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index bfadfe56ef..f265acf8c1 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -30,6 +30,7 @@ typedef struct SCliConn { char spi; char secured; uint64_t expireTime; + int8_t notifyCount; // timers already notify to client } SCliConn; typedef struct SCliMsg { @@ -72,8 +73,6 @@ static void addConnToPool(void* pool, char* ip, uint32_t port, SCliConn* co // register timer in each thread to clear expire conn static void clientTimeoutCb(uv_timer_t* handle); -// process data read from server, auth/decompress etc later -static void clientHandleResp(SCliConn* conn); // check whether already read complete packet from server static bool clientReadComplete(SConnBuffer* pBuf); // alloc buf for read @@ -88,10 +87,15 @@ static void clientAsyncCb(uv_async_t* handle); static void clientDestroy(uv_handle_t* handle); static void clientConnDestroy(SCliConn* pConn, bool clear /*clear tcp handle or not*/); -static void clientMsgDestroy(SCliMsg* pMsg); +// process data read from server, auth/decompress etc later +static void clientHandleResp(SCliConn* conn); +// handle except about conn +static void clientHandleExcept(SCliConn* conn); // handle req from app static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd); +static void clientMsgDestroy(SCliMsg* pMsg); +static void destroyTransConnCtx(STransConnCtx* ctx); // thread obj static SCliThrdObj* createThrdObj(); static void destroyThrdObj(SCliThrdObj* pThrd); @@ -100,22 +104,50 @@ static void* clientThread(void* arg); static void clientHandleResp(SCliConn* conn) { STransConnCtx* pCtx = ((SCliMsg*)conn->data)->ctx; - SRpcInfo* pRpc = pCtx->pRpc; - SRpcMsg rpcMsg; + SRpcInfo* pRpc = pCtx->pTransInst; - rpcMsg.pCont = conn->readBuf.buf; - rpcMsg.contLen = conn->readBuf.len; + STransMsgHead* pHead = (STransMsgHead*)(conn->readBuf.buf); + pHead->code = htonl(pHead->code); + pHead->msgLen = htonl(pHead->msgLen); + + SRpcMsg rpcMsg; + rpcMsg.contLen = transContLenFromMsg(pHead->msgLen); + rpcMsg.pCont = transContFromHead(pHead); + rpcMsg.code = pHead->code; + rpcMsg.msgType = pHead->msgType; rpcMsg.ahandle = pCtx->ahandle; + (pRpc->cfp)(NULL, &rpcMsg, NULL); + conn->notifyCount += 1; SCliThrdObj* pThrd = conn->hostThrd; + tfree(conn->data); addConnToPool(pThrd->pool, pCtx->ip, pCtx->port, conn); + + // start thread's timer of conn pool if not active if (!uv_is_active((uv_handle_t*)pThrd->pTimer) && pRpc->idleTime > 0) { uv_timer_start((uv_timer_t*)pThrd->pTimer, clientTimeoutCb, CONN_PERSIST_TIME(pRpc->idleTime) / 2, 0); } - free(pCtx->ip); - free(pCtx); - // impl + destroyTransConnCtx(pCtx); +} +static void clientHandleExcept(SCliConn* pConn) { + SCliMsg* pMsg = pConn->data; + + STransConnCtx* pCtx = pMsg->ctx; + SRpcInfo* pRpc = pCtx->pTransInst; + + transFreeMsg((pMsg->msg.pCont)); + pMsg->msg.pCont = NULL; + + SRpcMsg rpcMsg = {0}; + rpcMsg.ahandle = pCtx->ahandle; + rpcMsg.code = -1; + // SRpcInfo* pRpc = pMsg->ctx->pRpc; + (pRpc->cfp)(NULL, &rpcMsg, NULL); + tfree(pConn->data); + pConn->notifyCount += 1; + destroyTransConnCtx(pCtx); + clientConnDestroy(pConn, true); } static void clientTimeoutCb(uv_timer_t* handle) { @@ -191,6 +223,7 @@ static void addConnToPool(void* pool, char* ip, uint32_t port, SCliConn* conn) { SRpcInfo* pRpc = ((SCliThrdObj*)conn->hostThrd)->pTransInst; conn->expireTime = taosGetTimestampMs() + CONN_PERSIST_TIME(pRpc->idleTime); SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key)); + conn->notifyCount = 0; // list already create before assert(plist != NULL); QUEUE_PUSH(&plist->conn, &conn->conn); @@ -246,19 +279,21 @@ static void clientReadCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf if (nread > 0) { pBuf->len += nread; if (clientReadComplete(pBuf)) { - tDebug("alread read complete"); + tDebug("conn %p read complete", conn); clientHandleResp(conn); } else { - tDebug("read half packet, continue to read"); + tDebug("conn %p read partial packet, continue to read", conn); } return; } assert(nread <= 0); if (nread == 0) { + tError("conn %p closed", conn); return; } - if (nread != UV_EOF) { - tDebug("read error %s", uv_err_name(nread)); + if (nread < 0) { + tError("conn %p read error: %s", conn, uv_err_name(nread)); + clientHandleExcept(conn); } // tDebug("Read error %s\n", uv_err_name(nread)); // uv_close((uv_handle_t*)handle, clientDestroy); @@ -282,19 +317,24 @@ static void clientDestroy(uv_handle_t* handle) { static void clientWriteCb(uv_write_t* req, int status) { SCliConn* pConn = req->data; + + SCliMsg* pMsg = pConn->data; + transFreeMsg((pMsg->msg.pCont)); + pMsg->msg.pCont = NULL; + if (status == 0) { - tDebug("data already was written on stream"); + tDebug("conn %p data already was written out", pConn); } else { - tError("failed to write: %s", uv_err_name(status)); - clientConnDestroy(pConn, true); + tError("conn %p failed to write: %s", pConn, uv_err_name(status)); + clientHandleExcept(pConn); return; } SCliThrdObj* pThrd = pConn->hostThrd; - if (pConn->stream == NULL) { - pConn->stream = (uv_stream_t*)malloc(sizeof(uv_tcp_t)); - uv_tcp_init(pThrd->loop, (uv_tcp_t*)pConn->stream); - pConn->stream->data = pConn; - } + // if (pConn->stream == NULL) { + // pConn->stream = (uv_stream_t*)malloc(sizeof(uv_tcp_t)); + // uv_tcp_init(pThrd->loop, (uv_tcp_t*)pConn->stream); + // pConn->stream->data = pConn; + //} uv_read_start((uv_stream_t*)pConn->stream, clientAllocReadBufferCb, clientReadCb); // impl later } @@ -310,30 +350,19 @@ static void clientWrite(SCliConn* pConn) { pHead->msgLen = (int32_t)htonl((uint32_t)msgLen); uv_buf_t wb = uv_buf_init((char*)pHead, msgLen); - tDebug("data write out, msgType : %d, len: %d", pHead->msgType, msgLen); + tDebug("conn %p data write out, msgType : %d, len: %d", pConn, pHead->msgType, msgLen); uv_write(pConn->writeReq, (uv_stream_t*)pConn->stream, &wb, 1, clientWriteCb); } static void clientConnCb(uv_connect_t* req, int status) { // impl later SCliConn* pConn = req->data; - SCliMsg* pMsg = pConn->data; - - STransConnCtx* pCtx = pMsg->ctx; - SRpcInfo* pRpc = pCtx->pRpc; - if (status != 0) { // tError("failed to connect server(%s, %d), errmsg: %s", pCtx->ip, pCtx->port, uv_strerror(status)); - tError("failed to connect server, errmsg: %s", uv_strerror(status)); - // call user fp later - SRpcMsg rpcMsg; - rpcMsg.ahandle = pCtx->ahandle; - // SRpcInfo* pRpc = pMsg->ctx->pRpc; - (pRpc->cfp)(NULL, &rpcMsg, NULL); - - clientConnDestroy(pConn, true); - // uv_close((uv_handle_t*)req->handle, clientDestroy); + tError("conn %p failed to connect server: %s", pConn, uv_strerror(status)); + clientHandleExcept(pConn); return; } + tDebug("conn %p create", pConn); assert(pConn->stream == req->handle); clientWrite(pConn); @@ -349,6 +378,7 @@ static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { SCliConn* conn = getConnFromPool(pThrd->pool, pCtx->ip, pCtx->port); if (conn != NULL) { // impl later + tDebug("conn %p get from conn pool", conn); conn->data = pMsg; conn->writeReq->data = conn; @@ -462,6 +492,13 @@ static void destroyThrdObj(SCliThrdObj* pThrd) { free(pThrd->loop); free(pThrd); } + +static void destroyTransConnCtx(STransConnCtx* ctx) { + if (ctx != NULL) { + free(ctx->ip); + } + free(ctx); +} // void taosCloseClient(void* arg) { // impl later @@ -472,7 +509,6 @@ void taosCloseClient(void* arg) { free(cli->pThreadObj); free(cli); } - void rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* pRid) { // impl later char* ip = (char*)(pEpSet->fqdn[pEpSet->inUse]); @@ -487,7 +523,7 @@ void rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* STransConnCtx* pCtx = calloc(1, sizeof(STransConnCtx)); - pCtx->pRpc = (SRpcInfo*)shandle; + pCtx->pTransInst = (SRpcInfo*)shandle; pCtx->ahandle = pMsg->ahandle; pCtx->msgType = pMsg->msgType; pCtx->ip = strdup(ip); diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index 617abeea39..5bece11bec 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -191,4 +191,11 @@ void transConnCtxDestroy(STransConnCtx* ctx) { free(ctx->ip); free(ctx); } + +void transFreeMsg(void* msg) { + if (msg == NULL) { + return; + } + free((char*)msg - sizeof(STransMsgHead)); +} #endif diff --git a/source/libs/transport/src/transSrv.c b/source/libs/transport/src/transSrv.c index b519a35f24..c70b1a5b28 100644 --- a/source/libs/transport/src/transSrv.c +++ b/source/libs/transport/src/transSrv.c @@ -16,6 +16,7 @@ #ifdef USE_UV #include "transComm.h" + typedef struct SConn { uv_tcp_t* pTcp; uv_write_t* pWriter; @@ -26,7 +27,6 @@ typedef struct SConn { int ref; int persist; // persist connection or not SConnBuffer connBuf; // read buf, - int count; int inType; void* pTransInst; // rpc init void* ahandle; // @@ -226,7 +226,7 @@ static void uvHandleActivityTimeout(uv_timer_t* handle) { tDebug("%p timeout since no activity", conn); } -static void uvProcessData(SConn* pConn) { +static void uvHandleReq(SConn* pConn) { SRecvInfo info; SRecvInfo* p = &info; SConnBuffer* pBuf = &pConn->connBuf; @@ -271,6 +271,7 @@ static void uvProcessData(SConn* pConn) { rpcMsg.ahandle = NULL; rpcMsg.handle = pConn; + pConn->ref++; (*(pRpc->cfp))(pRpc->parent, &rpcMsg, NULL); // uv_timer_start(pConn->pTimer, uvHandleActivityTimeout, pRpc->idleTime * 10000, 0); // auth @@ -283,20 +284,23 @@ void uvOnReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { SConnBuffer* pBuf = &conn->connBuf; if (nread > 0) { pBuf->len += nread; - tDebug("on read %p, total read: %d, current read: %d", cli, pBuf->len, (int)nread); + tDebug("conn %p read summroy, total read: %d, current read: %d", conn, pBuf->len, (int)nread); if (readComplete(pBuf)) { - tDebug("alread read complete packet"); - uvProcessData(conn); + tDebug("conn %p alread read complete packet", conn); + uvHandleReq(conn); } else { - tDebug("read half packet, continue to read"); + tDebug("conn %p read partial packet, continue to read", conn); } return; } if (nread == 0) { + tDebug("conn %p except read", conn); + // destroyConn(conn, true); return; } if (nread != UV_EOF) { - tDebug("read error %s", uv_err_name(nread)); + tDebug("conn %p read error: %s", conn, uv_err_name(nread)); + destroyConn(conn, true); } } void uvAllocConnBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { @@ -306,7 +310,8 @@ void uvAllocConnBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* b void uvOnTimeoutCb(uv_timer_t* handle) { // opt - tDebug("time out"); + SConn* pConn = handle->data; + tDebug("conn %p time out", pConn); } void uvOnWriteCb(uv_write_t* req, int status) { @@ -316,10 +321,14 @@ void uvOnWriteCb(uv_write_t* req, int status) { buf->len = 0; memset(buf->buf, 0, buf->cap); buf->left = -1; + + SRpcMsg* pMsg = &conn->sendMsg; + transFreeMsg(pMsg->pCont); + if (status == 0) { - tDebug("data already was written on stream"); + tDebug("conn %p data already was written on stream", conn); } else { - tDebug("failed to write data, %s", uv_err_name(status)); + tDebug("conn %p failed to write data, %s", conn, uv_err_name(status)); destroyConn(conn, true); } // opt @@ -334,7 +343,7 @@ static void uvOnPipeWriteCb(uv_write_t* req, int status) { static void uvPrepareSendData(SConn* conn, uv_buf_t* wb) { // impl later; - tDebug("prepare to send back"); + tDebug("conn %p prepare to send resp", conn); SRpcMsg* pMsg = &conn->sendMsg; if (pMsg->pCont == 0) { pMsg->pCont = (void*)rpcMallocCont(0); @@ -427,6 +436,7 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) { assert(pending == UV_TCP); SConn* pConn = createConn(); + pConn->pTransInst = pThrd->pTransInst; /* init conn timer*/ pConn->pTimer = malloc(sizeof(uv_timer_t)); @@ -448,7 +458,7 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) { if (uv_accept(q, (uv_stream_t*)(pConn->pTcp)) == 0) { uv_os_fd_t fd; uv_fileno((const uv_handle_t*)pConn->pTcp, &fd); - tDebug("new connection created: %d", fd); + tDebug("conn %p created, fd: %d", pConn, fd); uv_read_start((uv_stream_t*)(pConn->pTcp), uvAllocReadBufferCb, uvOnReadCb); } else { tDebug("failed to create new connection"); @@ -515,19 +525,19 @@ void* workerThread(void* arg) { static SConn* createConn() { SConn* pConn = (SConn*)calloc(1, sizeof(SConn)); + ++pConn->ref; return pConn; } -static void connCloseCb(uv_handle_t* handle) { - // impl later - // -} + static void destroyConn(SConn* conn, bool clear) { if (conn == NULL) { return; } + if (--conn->ref == 0) { + return; + } if (clear) { - uv_handle_t handle = *((uv_handle_t*)conn->pTcp); - uv_close(&handle, NULL); + uv_close((uv_handle_t*)conn->pTcp, NULL); } uv_timer_stop(conn->pTimer); free(conn->pTimer); @@ -646,6 +656,7 @@ void rpcSendResponse(const SRpcMsg* pMsg) { pthread_mutex_lock(&pThrd->connMtx); QUEUE_PUSH(&pThrd->conn, &pConn->queue); pthread_mutex_unlock(&pThrd->connMtx); + tDebug("conn %p start to send resp", pConn); uv_async_send(pConn->pWorkerAsync); } diff --git a/source/libs/transport/test/rclient.c b/source/libs/transport/test/rclient.c index fd3496cc17..84814f39fc 100644 --- a/source/libs/transport/test/rclient.c +++ b/source/libs/transport/test/rclient.c @@ -63,7 +63,7 @@ static void *sendRequest(void *param) { if (pInfo->num % 20000 == 0) tInfo("thread:%d, %d requests have been sent", pInfo->index, pInfo->num); // tsem_wait(&pInfo->rspSem); tsem_wait(&pInfo->rspSem); - tDebug("recv response"); + tDebug("recv response succefully"); // usleep(100000000); }