From affa8a7ef275b7304cfc7f7d321d0dc5b3971234 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 21 Jan 2022 22:51:59 +0800 Subject: [PATCH 1/4] refactor rpc --- source/libs/transport/src/transCli.c | 35 +++++++++++++++++----------- 1 file changed, 21 insertions(+), 14 deletions(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 00bc1b621f..bf395d62e5 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -17,6 +17,8 @@ #include "transComm.h" +#define CONN_PERSIST_TIME(para) (para * 1000 * 100) + typedef struct SCliConn { uv_connect_t connReq; uv_stream_t* stream; @@ -102,6 +104,9 @@ static void clientProcessData(SCliConn* conn) { SCliThrdObj* pThrd = conn->hostThrd; addConnToCache(pThrd->cache, pCtx->ip, pCtx->port, conn); + if (!uv_is_active((uv_handle_t*)pThrd->pTimer) && pRpc->idleTime > 0) { + uv_timer_start((uv_timer_t*)pThrd->pTimer, clientTimeoutCb, CONN_PERSIST_TIME(pRpc->idleTime) / 2, 0); + } free(pCtx->ip); free(pCtx); // impl @@ -112,6 +117,7 @@ static void clientTimeoutCb(uv_timer_t* handle) { SCliThrdObj* pThrd = handle->data; SRpcInfo* pRpc = pThrd->pTransInst; int64_t currentTime = pThrd->nextTimeout; + tDebug("timeout, try to remove expire conn from connCache"); SConnList* p = taosHashIterate((SHashObj*)pThrd->cache, NULL); while (p != NULL) { @@ -128,8 +134,8 @@ static void clientTimeoutCb(uv_timer_t* handle) { p = taosHashIterate((SHashObj*)pThrd->cache, p); } - pThrd->nextTimeout = taosGetTimestampMs() + pRpc->idleTime * 1000 * 10; - uv_timer_start(handle, clientTimeoutCb, pRpc->idleTime * 10, 0); + pThrd->nextTimeout = taosGetTimestampMs() + CONN_PERSIST_TIME(pRpc->idleTime); + uv_timer_start(handle, clientTimeoutCb, CONN_PERSIST_TIME(pRpc->idleTime) / 2, 0); } static void* connCacheCreate(int size) { SHashObj* cache = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); @@ -158,8 +164,7 @@ static SCliConn* getConnFromCache(void* cache, char* ip, uint32_t port) { SConnList* plist = taosHashGet(pCache, key, strlen(key)); if (plist == NULL) { SConnList list; - plist = &list; - taosHashPut(pCache, key, strlen(key), plist, sizeof(*plist)); + taosHashPut(pCache, key, strlen(key), (void*)&list, sizeof(list)); plist = taosHashGet(pCache, key, strlen(key)); QUEUE_INIT(&plist->conn); } @@ -177,7 +182,7 @@ static void addConnToCache(void* cache, char* ip, uint32_t port, SCliConn* conn) tstrncpy(key + strlen(key), (char*)(&port), sizeof(port)); SRpcInfo* pRpc = ((SCliThrdObj*)conn->hostThrd)->pTransInst; - conn->expireTime = taosGetTimestampMs() + pRpc->idleTime * 1000 * 10; + conn->expireTime = taosGetTimestampMs() + CONN_PERSIST_TIME(pRpc->idleTime); SConnList* plist = taosHashGet((SHashObj*)cache, key, strlen(key)); // list already create before assert(plist != NULL); @@ -374,14 +379,13 @@ static void clientAsyncCb(uv_async_t* handle) { clientHandleReq(pMsg, pThrd); count++; if (count >= 2) { - tError("send batch size: %d", count); + tDebug("send batch size: %d", count); } } } static void* clientThread(void* arg) { SCliThrdObj* pThrd = (SCliThrdObj*)arg; - uv_run(pThrd->loop, UV_RUN_DEFAULT); } @@ -409,8 +413,8 @@ void* taosInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, pThrd->pTimer = malloc(sizeof(uv_timer_t)); uv_timer_init(pThrd->loop, pThrd->pTimer); pThrd->pTimer->data = pThrd; - pThrd->nextTimeout = taosGetTimestampMs() + pRpc->idleTime * 1000 * 10; + pThrd->nextTimeout = taosGetTimestampMs() + CONN_PERSIST_TIME(pRpc->idleTime); pThrd->cache = connCacheCreate(1); pThrd->pTransInst = shandle; @@ -426,16 +430,19 @@ static void clientMsgDestroy(SCliMsg* pMsg) { // impl later free(pMsg); } +static void destroyThrdObj(SCliThrdObj* pThrd) { + pthread_join(pThrd->thread, NULL); + pthread_mutex_destroy(&pThrd->msgMtx); + free(pThrd->cliAsync); + free(pThrd->loop); + free(pThrd); +} +// void taosCloseClient(void* arg) { // impl later SClientObj* cli = arg; for (int i = 0; i < cli->numOfThreads; i++) { - SCliThrdObj* pThrd = cli->pThreadObj[i]; - pthread_join(pThrd->thread, NULL); - pthread_mutex_destroy(&pThrd->msgMtx); - free(pThrd->cliAsync); - free(pThrd->loop); - free(pThrd); + destroyThrdObj(cli->pThreadObj[i]); } free(cli->pThreadObj); free(cli); From 069d715fe7b2a6dad495063d1f42e88730e4a0e0 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 21 Jan 2022 23:30:03 +0800 Subject: [PATCH 2/4] [td-11818]update log, fix bug in select * from super_table. --- include/common/tmsg.h | 4 +- include/libs/planner/planner.h | 2 +- source/client/inc/clientInt.h | 1 + source/client/src/clientImpl.c | 66 +++++++++++------------ source/libs/planner/inc/plannerInt.h | 2 +- source/libs/planner/src/physicalPlan.c | 26 ++++++--- source/libs/planner/src/planner.c | 20 ++++++- source/libs/planner/test/phyPlanTests.cpp | 6 +-- source/libs/qworker/inc/qworkerInt.h | 18 +++---- source/libs/scheduler/inc/schedulerInt.h | 10 ++-- source/libs/scheduler/src/scheduler.c | 15 ++---- 11 files changed, 93 insertions(+), 77 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index b468456cb7..fe75663ab5 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -696,13 +696,13 @@ typedef struct SVgroupInfo { uint32_t hashEnd; int8_t inUse; int8_t numOfEps; - SEpAddr epAddr[TSDB_MAX_REPLICA]; + SEpAddr epAddr[TSDB_MAX_REPLICA]; } SVgroupInfo; typedef struct { int32_t vgId; int8_t numOfEps; - SEpAddr epAddr[TSDB_MAX_REPLICA]; + SEpAddr epAddr[TSDB_MAX_REPLICA]; } SVgroupMsg; typedef struct { diff --git a/include/libs/planner/planner.h b/include/libs/planner/planner.h index ba3539e64e..126cee390c 100644 --- a/include/libs/planner/planner.h +++ b/include/libs/planner/planner.h @@ -178,7 +178,7 @@ struct SQueryNode; * @param requestId * @return */ -int32_t qCreateQueryDag(const struct SQueryNode* pQueryInfo, struct SQueryDag** pDag, uint64_t requestId); + int32_t qCreateQueryDag(const struct SQueryNode* pNode, struct SQueryDag** pDag, SSchema** pResSchema, int32_t* numOfCols, SArray* pNodeList, uint64_t requestId); // Set datasource of this subplan, multiple calls may be made to a subplan. // @subplan subplan to be schedule diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index 1d10869e30..296fbba634 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -136,6 +136,7 @@ typedef struct SReqResultInfo { TAOS_ROW row; char **pCol; uint32_t numOfRows; + uint64_t totalRows; uint32_t current; bool completed; } SReqResultInfo; diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 159a92b0ab..80184a2346 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -25,9 +25,9 @@ static int32_t initEpSetFromCfg(const char *firstEp, const char *secondEp, SCorEpSet *pEpSet); static SMsgSendInfo* buildConnectMsg(SRequestObj *pRequest); static void destroySendMsgInfo(SMsgSendInfo* pMsgBody); -static void setQueryResultByRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp); +static void setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp); - static bool stringLengthCheck(const char* str, size_t maxsize) { +static bool stringLengthCheck(const char* str, size_t maxsize) { if (str == NULL) { return false; } @@ -59,7 +59,7 @@ static char* getClusterKey(const char* user, const char* auth, const char* ip, i } static STscObj* taosConnectImpl(const char *user, const char *auth, const char *db, uint16_t port, __taos_async_fn_t fp, void *param, SAppInstInfo* pAppInfo); -static void setResSchemaInfo(SReqResultInfo* pResInfo, const SDataBlockSchema* pDataBlockSchema); +static void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t numOfCols); TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass, const char *auth, const char *db, uint16_t port) { if (taos_init() != TSDB_CODE_SUCCESS) { @@ -202,43 +202,38 @@ int32_t execDdlQuery(SRequestObj* pRequest, SQueryNode* pQuery) { return TSDB_CODE_SUCCESS; } -int32_t getPlan(SRequestObj* pRequest, SQueryNode* pQueryNode, SQueryDag** pDag) { +int32_t getPlan(SRequestObj* pRequest, SQueryNode* pQueryNode, SQueryDag** pDag, SArray* pNodeList) { pRequest->type = pQueryNode->type; - SReqResultInfo* pResInfo = &pRequest->body.resInfo; - int32_t code = qCreateQueryDag(pQueryNode, pDag, pRequest->requestId); + SSchema* pSchema = NULL; + int32_t numOfCols = 0; + int32_t code = qCreateQueryDag(pQueryNode, pDag, &pSchema, &numOfCols, pNodeList, pRequest->requestId); if (code != 0) { return code; } if (pQueryNode->type == TSDB_SQL_SELECT) { - SArray* pa = taosArrayGetP((*pDag)->pSubplans, 0); - - SSubplan* pPlan = taosArrayGetP(pa, 0); - SDataBlockSchema* pDataBlockSchema = &(pPlan->pDataSink->schema); - setResSchemaInfo(pResInfo, pDataBlockSchema); - + setResSchemaInfo(&pRequest->body.resInfo, pSchema, numOfCols); pRequest->type = TDMT_VND_QUERY; } return code; } -void setResSchemaInfo(SReqResultInfo* pResInfo, const SDataBlockSchema* pDataBlockSchema) { - assert(pDataBlockSchema != NULL && pDataBlockSchema->numOfCols > 0); +void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t numOfCols) { + assert(pSchema != NULL && numOfCols > 0); - pResInfo->numOfCols = pDataBlockSchema->numOfCols; - pResInfo->fields = calloc(pDataBlockSchema->numOfCols, sizeof(pDataBlockSchema->pSchema[0])); + pResInfo->numOfCols = numOfCols; + pResInfo->fields = calloc(numOfCols, sizeof(pSchema[0])); for (int32_t i = 0; i < pResInfo->numOfCols; ++i) { - SSchema* pSchema = &pDataBlockSchema->pSchema[i]; - pResInfo->fields[i].bytes = pSchema->bytes; - pResInfo->fields[i].type = pSchema->type; - tstrncpy(pResInfo->fields[i].name, pSchema->name, tListLen(pResInfo->fields[i].name)); + pResInfo->fields[i].bytes = pSchema[i].bytes; + pResInfo->fields[i].type = pSchema[i].type; + tstrncpy(pResInfo->fields[i].name, pSchema[i].name, tListLen(pResInfo->fields[i].name)); } } -int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag) { +int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag, SArray* pNodeList) { if (TSDB_SQL_INSERT == pRequest->type || TSDB_SQL_CREATE_TABLE == pRequest->type) { SQueryResult res = {.code = 0, .numOfRows = 0, .msgSize = ERROR_MSG_BUF_DEFAULT_SIZE, .msg = pRequest->msgBuf}; @@ -256,14 +251,7 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag) { return pRequest->code; } - SArray *execNode = taosArrayInit(4, sizeof(SQueryNodeAddr)); - - SQueryNodeAddr addr = {.numOfEps = 1, .inUse = 0, .nodeId = 2}; - addr.epAddr[0].port = 7100; - strcpy(addr.epAddr[0].fqdn, "localhost"); - - taosArrayPush(execNode, &addr); - return scheduleAsyncExecJob(pRequest->pTscObj->pAppInfo->pTransporter, execNode, pDag, &pRequest->body.pQueryJob); + return scheduleAsyncExecJob(pRequest->pTscObj->pAppInfo->pTransporter, pNodeList, pDag, &pRequest->body.pQueryJob); } typedef struct tmq_t tmq_t; @@ -399,7 +387,9 @@ TAOS_RES *taos_create_topic(TAOS* taos, const char* topicName, const char* sql, // todo check for invalid sql statement and return with error code - CHECK_CODE_GOTO(qCreateQueryDag(pQueryNode, &pRequest->body.pDag, pRequest->requestId), _return); + SSchema *schema = NULL; + int32_t numOfCols = 0; + CHECK_CODE_GOTO(qCreateQueryDag(pQueryNode, &pRequest->body.pDag, &schema, &numOfCols, NULL, pRequest->requestId), _return); pStr = qDagToString(pRequest->body.pDag); if(pStr == NULL) { @@ -492,6 +482,7 @@ TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen) { SRequestObj *pRequest = NULL; SQueryNode *pQueryNode = NULL; + SArray *pNodeList = taosArrayInit(4, sizeof(struct SQueryNodeAddr)); terrno = TSDB_CODE_SUCCESS; CHECK_CODE_GOTO(buildRequest(pTscObj, sql, sqlLen, &pRequest), _return); @@ -500,8 +491,8 @@ TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen) { if (qIsDdlQuery(pQueryNode)) { CHECK_CODE_GOTO(execDdlQuery(pRequest, pQueryNode), _return); } else { - CHECK_CODE_GOTO(getPlan(pRequest, pQueryNode, &pRequest->body.pDag), _return); - CHECK_CODE_GOTO(scheduleQuery(pRequest, pRequest->body.pDag), _return); + CHECK_CODE_GOTO(getPlan(pRequest, pQueryNode, &pRequest->body.pDag, pNodeList), _return); + CHECK_CODE_GOTO(scheduleQuery(pRequest, pRequest->body.pDag, pNodeList), _return); pRequest->code = terrno; } @@ -719,13 +710,17 @@ void* doFetchRow(SRequestObj* pRequest) { return NULL; } - int32_t code = scheduleFetchRows(pRequest->body.pQueryJob, (void **)&pRequest->body.resInfo.pData); + SReqResultInfo* pResInfo = &pRequest->body.resInfo; + int32_t code = scheduleFetchRows(pRequest->body.pQueryJob, (void **)&pResInfo->pData); if (code != TSDB_CODE_SUCCESS) { pRequest->code = code; return NULL; } - setQueryResultByRsp(&pRequest->body.resInfo, (SRetrieveTableRsp*)pRequest->body.resInfo.pData); + setQueryResultFromRsp(&pRequest->body.resInfo, (SRetrieveTableRsp*)pResInfo->pData); + tscDebug("0x%"PRIx64 " fetch results, numOfRows:%d total Rows:%"PRId64", complete:%d, reqId:0x%"PRIx64, pRequest->self, pResInfo->numOfRows, + pResInfo->totalRows, pResInfo->completed, pRequest->requestId); + if (pResultInfo->numOfRows == 0) { return NULL; } @@ -855,7 +850,7 @@ void setConnectionDB(STscObj* pTscObj, const char* db) { pthread_mutex_unlock(&pTscObj->mutex); } -void setQueryResultByRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp) { +void setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp) { assert(pResultInfo != NULL && pRsp != NULL); pResultInfo->pRspMsg = (const char*) pRsp; @@ -864,5 +859,6 @@ void setQueryResultByRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* p pResultInfo->current = 0; pResultInfo->completed = (pRsp->completed == 1); + pResultInfo->totalRows += pResultInfo->numOfRows; setResultDataPtr(pResultInfo, pResultInfo->fields, pResultInfo->numOfCols, pResultInfo->numOfRows); } diff --git a/source/libs/planner/inc/plannerInt.h b/source/libs/planner/inc/plannerInt.h index 41f50607cb..26ae44a08f 100644 --- a/source/libs/planner/inc/plannerInt.h +++ b/source/libs/planner/inc/plannerInt.h @@ -106,7 +106,7 @@ int32_t queryPlanToString(struct SQueryPlanNode* pQueryNode, char** str); */ int32_t queryPlanToSql(struct SQueryPlanNode* pQueryNode, char** sql); -int32_t createDag(SQueryPlanNode* pQueryNode, struct SCatalog* pCatalog, SQueryDag** pDag, uint64_t requestId); +int32_t createDag(SQueryPlanNode* pQueryNode, struct SCatalog* pCatalog, SQueryDag** pDag, SArray* pNodeList, uint64_t requestId); void setSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SDownstreamSource* pSource); int32_t subPlanToString(const SSubplan *pPhyNode, char** str, int32_t* len); int32_t stringToSubplan(const char* str, SSubplan** subplan); diff --git a/source/libs/planner/src/physicalPlan.c b/source/libs/planner/src/physicalPlan.c index dd869d87b3..0038c51c7a 100644 --- a/source/libs/planner/src/physicalPlan.c +++ b/source/libs/planner/src/physicalPlan.c @@ -261,14 +261,14 @@ static void vgroupMsgToEpSet(const SVgroupMsg* vg, SQueryNodeAddr* execNode) { return; } -static uint64_t splitSubplanByTable(SPlanContext* pCxt, SQueryPlanNode* pPlanNode, SQueryTableInfo* pTable) { - SVgroupsInfo* pVgroupList = pTable->pMeta->vgroupList; +static uint64_t splitSubplanByTable(SPlanContext* pCxt, SQueryPlanNode* pPlanNode, SQueryTableInfo* pTableInfo) { + SVgroupsInfo* pVgroupList = pTableInfo->pMeta->vgroupList; for (int32_t i = 0; i < pVgroupList->numOfVgroups; ++i) { STORE_CURRENT_SUBPLAN(pCxt); SSubplan* subplan = initSubplan(pCxt, QUERY_TYPE_SCAN); subplan->msgType = TDMT_VND_QUERY; - vgroupMsgToEpSet(&(pTable->pMeta->vgroupList->vgroups[i]), &subplan->execNode); - subplan->pNode = createMultiTableScanNode(pPlanNode, pTable); + vgroupMsgToEpSet(&(pTableInfo->pMeta->vgroupList->vgroups[i]), &subplan->execNode); + subplan->pNode = createMultiTableScanNode(pPlanNode, pTableInfo); subplan->pDataSink = createDataDispatcher(pCxt, pPlanNode, subplan->pNode); RECOVERY_CURRENT_SUBPLAN(pCxt); } @@ -384,18 +384,19 @@ static void createSubplanByLevel(SPlanContext* pCxt, SQueryPlanNode* pRoot) { subplan->msgType = TDMT_VND_QUERY; subplan->pNode = createPhyNode(pCxt, pRoot); - subplan->pDataSink = createDataDispatcher(pCxt, pRoot, subplan->pNode); + subplan->pDataSink = createDataDispatcher(pCxt, pRoot, subplan->pNode); } // todo deal subquery } -int32_t createDag(SQueryPlanNode* pQueryNode, struct SCatalog* pCatalog, SQueryDag** pDag, uint64_t requestId) { +int32_t createDag(SQueryPlanNode* pQueryNode, struct SCatalog* pCatalog, SQueryDag** pDag, SArray* pNodeList, uint64_t requestId) { TRY(TSDB_MAX_TAG_CONDITIONS) { SPlanContext context = { .pCatalog = pCatalog, .pDag = validPointer(calloc(1, sizeof(SQueryDag))), .pCurrentSubplan = NULL, - .nextId = {.queryId = requestId}, + //The unsigned Id starting from 1 would be better + .nextId = {.queryId = requestId, .subplanId = 1, .templateId = 1}, }; *pDag = context.pDag; @@ -408,6 +409,17 @@ int32_t createDag(SQueryPlanNode* pQueryNode, struct SCatalog* pCatalog, SQueryD terrno = code; return TSDB_CODE_FAILED; } END_TRY + + // traverse the dag again to acquire the execution node. + if (pNodeList != NULL) { + SArray** pSubLevel = taosArrayGetLast((*pDag)->pSubplans); + size_t num = taosArrayGetSize(*pSubLevel); + for (int32_t j = 0; j < num; ++j) { + SSubplan* pPlan = taosArrayGetP(*pSubLevel, j); + taosArrayPush(pNodeList, &pPlan->execNode); + } + } + return TSDB_CODE_SUCCESS; } diff --git a/source/libs/planner/src/planner.c b/source/libs/planner/src/planner.c index 9b32213ad7..c93569a6c1 100644 --- a/source/libs/planner/src/planner.c +++ b/source/libs/planner/src/planner.c @@ -16,6 +16,8 @@ #include "parser.h" #include "plannerInt.h" +static void extractResSchema(struct SQueryDag* const* pDag, SSchema** pResSchema, int32_t* numOfCols); + static void destroyDataSinkNode(SDataSink* pSinkNode) { if (pSinkNode == NULL) { return; @@ -56,7 +58,7 @@ void qDestroyQueryDag(struct SQueryDag* pDag) { tfree(pDag); } -int32_t qCreateQueryDag(const struct SQueryNode* pNode, struct SQueryDag** pDag, uint64_t requestId) { +int32_t qCreateQueryDag(const struct SQueryNode* pNode, struct SQueryDag** pDag, SSchema** pResSchema, int32_t* numOfCols, SArray* pNodeList, uint64_t requestId) { SQueryPlanNode* pLogicPlan; int32_t code = createQueryPlan(pNode, &pLogicPlan); if (TSDB_CODE_SUCCESS != code) { @@ -76,17 +78,31 @@ int32_t qCreateQueryDag(const struct SQueryNode* pNode, struct SQueryDag** pDag, return code; } - code = createDag(pLogicPlan, NULL, pDag, requestId); + code = createDag(pLogicPlan, NULL, pDag, pNodeList, requestId); if (TSDB_CODE_SUCCESS != code) { destroyQueryPlan(pLogicPlan); qDestroyQueryDag(*pDag); return code; } + extractResSchema(pDag, pResSchema, numOfCols); + destroyQueryPlan(pLogicPlan); return TSDB_CODE_SUCCESS; } +void extractResSchema(struct SQueryDag* const* pDag, SSchema** pResSchema, + int32_t* numOfCols) { // extract the final result schema + SArray* pTopSubplan = taosArrayGetP((*pDag)->pSubplans, 0); + + SSubplan* pPlan = taosArrayGetP(pTopSubplan, 0); + SDataBlockSchema* pDataBlockSchema = &(pPlan->pDataSink->schema); + + *numOfCols = pDataBlockSchema->numOfCols; + *pResSchema = calloc(pDataBlockSchema->numOfCols, sizeof(SSchema)); + memcpy((*pResSchema), pDataBlockSchema->pSchema, pDataBlockSchema->numOfCols * sizeof(SSchema)); +} + void qSetSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SDownstreamSource* pSource) { setSubplanExecutionNode(subplan, templateId, pSource); } diff --git a/source/libs/planner/test/phyPlanTests.cpp b/source/libs/planner/test/phyPlanTests.cpp index 6d9e08e829..edf5fa5a81 100644 --- a/source/libs/planner/test/phyPlanTests.cpp +++ b/source/libs/planner/test/phyPlanTests.cpp @@ -61,7 +61,7 @@ protected: int32_t run() { SQueryDag* dag = nullptr; uint64_t requestId = 20; - int32_t code = createDag(logicPlan_.get(), nullptr, &dag, requestId); + int32_t code = createDag(logicPlan_.get(), nullptr, &dag, NULL, requestId); dag_.reset(dag); return code; } @@ -78,9 +78,9 @@ protected: SQueryDag* dag = nullptr; uint64_t requestId = 20; SSchema *schema = NULL; - uint32_t numOfOutput = 0; + int32_t numOfOutput = 0; - code = qCreateQueryDag(query, &dag, requestId); + code = qCreateQueryDag(query, &dag, &schema, &numOfOutput, nullptr, requestId); dag_.reset(dag); return code; } diff --git a/source/libs/qworker/inc/qworkerInt.h b/source/libs/qworker/inc/qworkerInt.h index 5f9b33f7e3..3a8f34e831 100644 --- a/source/libs/qworker/inc/qworkerInt.h +++ b/source/libs/qworker/inc/qworkerInt.h @@ -171,17 +171,17 @@ typedef struct SQWorkerMgmt { #define QW_SCH_ELOG(param, ...) qError("QW:%p SID:%"PRIx64" " param, mgmt, sId, __VA_ARGS__) #define QW_SCH_DLOG(param, ...) qDebug("QW:%p SID:%"PRIx64" " param, mgmt, sId, __VA_ARGS__) -#define QW_TASK_ELOG(param, ...) qError("QW:%p QID:%"PRIx64",TID:%"PRIx64" " param, mgmt, qId, tId, __VA_ARGS__) -#define QW_TASK_WLOG(param, ...) qWarn("QW:%p QID:%"PRIx64",TID:%"PRIx64" " param, mgmt, qId, tId, __VA_ARGS__) -#define QW_TASK_DLOG(param, ...) qDebug("QW:%p QID:%"PRIx64",TID:%"PRIx64" " param, mgmt, qId, tId, __VA_ARGS__) +#define QW_TASK_ELOG(param, ...) qError("QW:%p QID:0x%"PRIx64",TID:%"PRIx64" " param, mgmt, qId, tId, __VA_ARGS__) +#define QW_TASK_WLOG(param, ...) qWarn("QW:%p QID:0x%"PRIx64",TID:%"PRIx64" " param, mgmt, qId, tId, __VA_ARGS__) +#define QW_TASK_DLOG(param, ...) qDebug("QW:%p QID:0x%"PRIx64",TID:%"PRIx64" " param, mgmt, qId, tId, __VA_ARGS__) -#define QW_TASK_ELOG_E(param) qError("QW:%p QID:%"PRIx64",TID:%"PRIx64" " param, mgmt, qId, tId) -#define QW_TASK_WLOG_E(param) qWarn("QW:%p QID:%"PRIx64",TID:%"PRIx64" " param, mgmt, qId, tId) -#define QW_TASK_DLOG_E(param) qDebug("QW:%p QID:%"PRIx64",TID:%"PRIx64" " param, mgmt, qId, tId) +#define QW_TASK_ELOG_E(param) qError("QW:%p QID:0x%"PRIx64",TID:%"PRIx64" " param, mgmt, qId, tId) +#define QW_TASK_WLOG_E(param) qWarn("QW:%p QID:0x%"PRIx64",TID:%"PRIx64" " param, mgmt, qId, tId) +#define QW_TASK_DLOG_E(param) qDebug("QW:%p QID:0x%"PRIx64",TID:%"PRIx64" " param, mgmt, qId, tId) -#define QW_SCH_TASK_ELOG(param, ...) qError("QW:%p SID:%"PRIx64",QID:%"PRIx64",TID:%"PRIx64" " param, mgmt, sId, qId, tId, __VA_ARGS__) -#define QW_SCH_TASK_WLOG(param, ...) qWarn("QW:%p SID:%"PRIx64",QID:%"PRIx64",TID:%"PRIx64" " param, mgmt, sId, qId, tId, __VA_ARGS__) -#define QW_SCH_TASK_DLOG(param, ...) qDebug("QW:%p SID:%"PRIx64",QID:%"PRIx64",TID:%"PRIx64" " param, mgmt, sId, qId, tId, __VA_ARGS__) +#define QW_SCH_TASK_ELOG(param, ...) qError("QW:%p SID:%"PRIx64",QID:0x%"PRIx64",TID:%"PRIx64" " param, mgmt, sId, qId, tId, __VA_ARGS__) +#define QW_SCH_TASK_WLOG(param, ...) qWarn("QW:%p SID:%"PRIx64",QID:0x%"PRIx64",TID:%"PRIx64" " param, mgmt, sId, qId, tId, __VA_ARGS__) +#define QW_SCH_TASK_DLOG(param, ...) qDebug("QW:%p SID:%"PRIx64",QID:0x%"PRIx64",TID:%"PRIx64" " param, mgmt, sId, qId, tId, __VA_ARGS__) #define QW_LOCK_DEBUG(...) do { if (gQWDebug.lockDebug) { qDebug(__VA_ARGS__); } } while (0) diff --git a/source/libs/scheduler/inc/schedulerInt.h b/source/libs/scheduler/inc/schedulerInt.h index 6b047eb96e..2ab519424b 100644 --- a/source/libs/scheduler/inc/schedulerInt.h +++ b/source/libs/scheduler/inc/schedulerInt.h @@ -146,12 +146,12 @@ typedef struct SSchJob { #define SCH_SET_JOB_TYPE(pAttr, type) (pAttr)->queryJob = ((type) != QUERY_TYPE_MODIFY) #define SCH_JOB_NEED_FETCH(pAttr) ((pAttr)->queryJob) -#define SCH_JOB_ELOG(param, ...) qError("QID:%"PRIx64" " param, pJob->queryId, __VA_ARGS__) -#define SCH_JOB_DLOG(param, ...) qDebug("QID:%"PRIx64" " param, pJob->queryId, __VA_ARGS__) +#define SCH_JOB_ELOG(param, ...) qError("QID:0x%"PRIx64" " param, pJob->queryId, __VA_ARGS__) +#define SCH_JOB_DLOG(param, ...) qDebug("QID:0x%"PRIx64" " param, pJob->queryId, __VA_ARGS__) -#define SCH_TASK_ELOG(param, ...) qError("QID:%"PRIx64",TID:%"PRId64" " param, pJob->queryId, pTask->taskId, __VA_ARGS__) -#define SCH_TASK_DLOG(param, ...) qDebug("QID:%"PRIx64",TID:%"PRId64" " param, pJob->queryId, pTask->taskId, __VA_ARGS__) -#define SCH_TASK_WLOG(param, ...) qWarn("QID:%"PRIx64",TID:%"PRId64" " param, pJob->queryId, pTask->taskId, __VA_ARGS__) +#define SCH_TASK_ELOG(param, ...) qError("QID:0x%"PRIx64",TID:%"PRId64" " param, pJob->queryId, pTask->taskId, __VA_ARGS__) +#define SCH_TASK_DLOG(param, ...) qDebug("QID:0x%"PRIx64",TID:%"PRId64" " param, pJob->queryId, pTask->taskId, __VA_ARGS__) +#define SCH_TASK_WLOG(param, ...) qWarn("QID:0x%"PRIx64",TID:%"PRId64" " param, pJob->queryId, pTask->taskId, __VA_ARGS__) #define SCH_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; return _code; } } while (0) #define SCH_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0) diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index ddfa73f0a5..7b9396bcb9 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -650,15 +650,11 @@ _return: SCH_RET(code); } - int32_t schProcessOnDataFetched(SSchJob *job) { atomic_val_compare_exchange_32(&job->remoteFetch, 1, 0); - tsem_post(&job->rspSem); } - - // Note: no more error processing, handled in function internal int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode) { bool needRetry = false; @@ -882,7 +878,6 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch } SCH_ERR_JRET(schProcessOnDataFetched(pJob)); - break; } case TDMT_VND_DROP_TASK: { @@ -892,7 +887,6 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch } default: SCH_TASK_ELOG("unknown rsp msg, type:%d, status:%d", msgType, SCH_GET_TASK_STATUS(pTask)); - SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT); } @@ -935,8 +929,7 @@ int32_t schHandleCallback(void* param, const SDataBuf* pMsg, int32_t msgType, in } pTask = *task; - - SCH_TASK_DLOG("rsp msg received, type:%d, code:%x", msgType, rspCode); + SCH_TASK_DLOG("rsp msg received, type:%s, code:%s", TMSG_INFO(msgType), tstrerror(rspCode)); SCH_ERR_JRET(schHandleResponseMsg(pJob, pTask, msgType, pMsg->pData, pMsg->len, rspCode)); @@ -1562,8 +1555,8 @@ int32_t scheduleFetchRows(SSchJob *pJob, void** pData) { if (NULL == pJob || NULL == pData) { SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } - int32_t code = 0; + int32_t code = 0; atomic_add_fetch_32(&pJob->ref, 1); int8_t status = SCH_GET_JOB_STATUS(pJob); @@ -1609,7 +1602,6 @@ _return: while (true) { *pData = atomic_load_ptr(&pJob->res); - if (*pData != atomic_val_compare_exchange_ptr(&pJob->res, *pData, NULL)) { continue; } @@ -1628,8 +1620,7 @@ _return: atomic_val_compare_exchange_8(&pJob->userFetch, 1, 0); - SCH_JOB_DLOG("fetch done, code:%x", code); - + SCH_JOB_DLOG("fetch done, code:%s", tstrerror(code)); atomic_sub_fetch_32(&pJob->ref, 1); SCH_RET(code); From a4aa98dafec75d4754e182724e062313e21d91f5 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 22 Jan 2022 11:19:31 +0800 Subject: [PATCH 3/4] [td-11818] add table filter uid. --- source/libs/executor/src/executorimpl.c | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index d9b3f5f4a9..e963c49c86 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -5408,7 +5408,7 @@ SOperatorInfo* createTableBlockInfoScanOperator(void* pTsdbReadHandle, STaskRunt return pOperator; } -SOperatorInfo* createStreamScanOperatorInfo(void *streamReadHandle, SArray* pExprInfo, SExecTaskInfo* pTaskInfo) { +SOperatorInfo* createStreamScanOperatorInfo(void *streamReadHandle, SArray* pExprInfo, uint64_t uid, SExecTaskInfo* pTaskInfo) { SStreamBlockScanInfo* pInfo = calloc(1, sizeof(SStreamBlockScanInfo)); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { @@ -5428,6 +5428,7 @@ SOperatorInfo* createStreamScanOperatorInfo(void *streamReadHandle, SArray* pExp // set the extract column id to streamHandle tqReadHandleSetColIdList((STqReadHandle* )streamReadHandle, pColList); + tqReadHandleSetTbUid(streamReadHandle, uid); pInfo->readerHandle = streamReadHandle; @@ -7719,7 +7720,8 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhyNode* pPhyNode, SExecTaskInfo* pTask return createExchangeOperatorInfo(pEx->pSrcEndPoints, pEx->node.pTargets, pTaskInfo); } else if (pPhyNode->info.type == OP_StreamScan) { size_t numOfCols = taosArrayGetSize(pPhyNode->pTargets); - return createStreamScanOperatorInfo(readerHandle, pPhyNode->pTargets, pTaskInfo); + SScanPhyNode* pScanPhyNode = (SScanPhyNode*)pPhyNode; // simple child table. + return createStreamScanOperatorInfo(readerHandle, pPhyNode->pTargets, pScanPhyNode->uid, pTaskInfo); } } From 046794047b9fea16ab465ca6eb88d0aa1413fcc9 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Sat, 22 Jan 2022 12:09:34 +0800 Subject: [PATCH 4/4] refactor uv code --- source/libs/transport/src/transCli.c | 102 +++++++++++++++------------ source/libs/transport/src/transSrv.c | 82 ++++++++++++++------- 2 files changed, 115 insertions(+), 69 deletions(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index bf395d62e5..ffd8d35bfc 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -44,7 +44,7 @@ typedef struct SCliThrdObj { uv_loop_t* loop; uv_async_t* cliAsync; // uv_timer_t* pTimer; - void* cache; // conn pool + void* pool; // conn pool queue msg; pthread_mutex_t msgMtx; uint64_t nextTimeout; // next timeout @@ -65,10 +65,10 @@ typedef struct SConnList { // conn pool // add expire timeout and capacity limit -static void* connCacheCreate(int size); -static void* connCacheDestroy(void* cache); -static SCliConn* getConnFromCache(void* cache, char* ip, uint32_t port); -static void addConnToCache(void* cache, char* ip, uint32_t port, SCliConn* conn); +static void* connPoolCreate(int size); +static void* connPoolDestroy(void* pool); +static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port); +static void addConnToPool(void* pool, char* ip, uint32_t port, SCliConn* conn); // register timer in each thread to clear expire conn static void clientTimeoutCb(uv_timer_t* handle); @@ -90,8 +90,14 @@ static void clientConnDestroy(SCliConn* pConn); static void clientMsgDestroy(SCliMsg* pMsg); +// thread obj +static SCliThrdObj* createThrdObj(); +static void destroyThrdObj(SCliThrdObj* pThrd); +// thread static void* clientThread(void* arg); +static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd); + static void clientProcessData(SCliConn* conn) { STransConnCtx* pCtx = ((SCliMsg*)conn->data)->ctx; SRpcInfo* pRpc = pCtx->pRpc; @@ -103,7 +109,7 @@ static void clientProcessData(SCliConn* conn) { (pRpc->cfp)(NULL, &rpcMsg, NULL); SCliThrdObj* pThrd = conn->hostThrd; - addConnToCache(pThrd->cache, pCtx->ip, pCtx->port, conn); + addConnToPool(pThrd->pool, pCtx->ip, pCtx->port, conn); if (!uv_is_active((uv_handle_t*)pThrd->pTimer) && pRpc->idleTime > 0) { uv_timer_start((uv_timer_t*)pThrd->pTimer, clientTimeoutCb, CONN_PERSIST_TIME(pRpc->idleTime) / 2, 0); } @@ -111,15 +117,14 @@ static void clientProcessData(SCliConn* conn) { free(pCtx); // impl } -static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd); static void clientTimeoutCb(uv_timer_t* handle) { SCliThrdObj* pThrd = handle->data; SRpcInfo* pRpc = pThrd->pTransInst; int64_t currentTime = pThrd->nextTimeout; - tDebug("timeout, try to remove expire conn from connCache"); + tDebug("timeout, try to remove expire conn from conn pool"); - SConnList* p = taosHashIterate((SHashObj*)pThrd->cache, NULL); + SConnList* p = taosHashIterate((SHashObj*)pThrd->pool, NULL); while (p != NULL) { while (!QUEUE_IS_EMPTY(&p->conn)) { queue* h = QUEUE_HEAD(&p->conn); @@ -131,18 +136,18 @@ static void clientTimeoutCb(uv_timer_t* handle) { break; } } - p = taosHashIterate((SHashObj*)pThrd->cache, p); + p = taosHashIterate((SHashObj*)pThrd->pool, p); } pThrd->nextTimeout = taosGetTimestampMs() + CONN_PERSIST_TIME(pRpc->idleTime); uv_timer_start(handle, clientTimeoutCb, CONN_PERSIST_TIME(pRpc->idleTime) / 2, 0); } -static void* connCacheCreate(int size) { - SHashObj* cache = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); - return cache; +static void* connPoolCreate(int size) { + SHashObj* pool = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); + return pool; } -static void* connCacheDestroy(void* cache) { - SConnList* connList = taosHashIterate((SHashObj*)cache, NULL); +static void* connPoolDestroy(void* pool) { + SConnList* connList = taosHashIterate((SHashObj*)pool, NULL); while (connList != NULL) { while (!QUEUE_IS_EMPTY(&connList->conn)) { queue* h = QUEUE_HEAD(&connList->conn); @@ -150,22 +155,22 @@ static void* connCacheDestroy(void* cache) { SCliConn* c = QUEUE_DATA(h, SCliConn, conn); clientConnDestroy(c); } - connList = taosHashIterate((SHashObj*)cache, connList); + connList = taosHashIterate((SHashObj*)pool, connList); } - taosHashClear(cache); + taosHashClear(pool); } -static SCliConn* getConnFromCache(void* cache, char* ip, uint32_t port) { +static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port) { char key[128] = {0}; tstrncpy(key, ip, strlen(ip)); tstrncpy(key + strlen(key), (char*)(&port), sizeof(port)); - SHashObj* pCache = cache; - SConnList* plist = taosHashGet(pCache, key, strlen(key)); + SHashObj* pPool = pool; + SConnList* plist = taosHashGet(pPool, key, strlen(key)); if (plist == NULL) { SConnList list; - taosHashPut(pCache, key, strlen(key), (void*)&list, sizeof(list)); - plist = taosHashGet(pCache, key, strlen(key)); + taosHashPut(pPool, key, strlen(key), (void*)&list, sizeof(list)); + plist = taosHashGet(pPool, key, strlen(key)); QUEUE_INIT(&plist->conn); } @@ -176,14 +181,14 @@ static SCliConn* getConnFromCache(void* cache, char* ip, uint32_t port) { QUEUE_REMOVE(h); return QUEUE_DATA(h, SCliConn, conn); } -static void addConnToCache(void* cache, char* ip, uint32_t port, SCliConn* conn) { +static void addConnToPool(void* pool, char* ip, uint32_t port, SCliConn* conn) { char key[128] = {0}; tstrncpy(key, ip, strlen(ip)); tstrncpy(key + strlen(key), (char*)(&port), sizeof(port)); SRpcInfo* pRpc = ((SCliThrdObj*)conn->hostThrd)->pTransInst; conn->expireTime = taosGetTimestampMs() + CONN_PERSIST_TIME(pRpc->idleTime); - SConnList* plist = taosHashGet((SHashObj*)cache, key, strlen(key)); + SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key)); // list already create before assert(plist != NULL); QUEUE_PUSH(&plist->conn, &conn->conn); @@ -327,7 +332,7 @@ static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { et = taosGetTimestampUs(); STransConnCtx* pCtx = pMsg->ctx; - SCliConn* conn = getConnFromCache(pThrd->cache, pCtx->ip, pCtx->port); + SCliConn* conn = getConnFromPool(pThrd->pool, pCtx->ip, pCtx->port); if (conn != NULL) { // impl later conn->data = pMsg; @@ -378,9 +383,9 @@ static void clientAsyncCb(uv_async_t* handle) { SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q); clientHandleReq(pMsg, pThrd); count++; - if (count >= 2) { - tDebug("send batch size: %d", count); - } + } + if (count >= 2) { + tDebug("already process batch size: %d", count); } } @@ -398,24 +403,8 @@ void* taosInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, cli->pThreadObj = (SCliThrdObj**)calloc(cli->numOfThreads, sizeof(SCliThrdObj*)); for (int i = 0; i < cli->numOfThreads; i++) { - SCliThrdObj* pThrd = (SCliThrdObj*)calloc(1, sizeof(SCliThrdObj)); - - QUEUE_INIT(&pThrd->msg); - pthread_mutex_init(&pThrd->msgMtx, NULL); - - pThrd->loop = (uv_loop_t*)malloc(sizeof(uv_loop_t)); - uv_loop_init(pThrd->loop); - - pThrd->cliAsync = malloc(sizeof(uv_async_t)); - uv_async_init(pThrd->loop, pThrd->cliAsync, clientAsyncCb); - pThrd->cliAsync->data = pThrd; - - pThrd->pTimer = malloc(sizeof(uv_timer_t)); - uv_timer_init(pThrd->loop, pThrd->pTimer); - pThrd->pTimer->data = pThrd; - + SCliThrdObj* pThrd = createThrdObj(); pThrd->nextTimeout = taosGetTimestampMs() + CONN_PERSIST_TIME(pRpc->idleTime); - pThrd->cache = connCacheCreate(1); pThrd->pTransInst = shandle; int err = pthread_create(&pThrd->thread, NULL, clientThread, (void*)(pThrd)); @@ -430,7 +419,30 @@ static void clientMsgDestroy(SCliMsg* pMsg) { // impl later free(pMsg); } +static SCliThrdObj* createThrdObj() { + SCliThrdObj* pThrd = (SCliThrdObj*)calloc(1, sizeof(SCliThrdObj)); + + QUEUE_INIT(&pThrd->msg); + pthread_mutex_init(&pThrd->msgMtx, NULL); + + pThrd->loop = (uv_loop_t*)malloc(sizeof(uv_loop_t)); + uv_loop_init(pThrd->loop); + + pThrd->cliAsync = malloc(sizeof(uv_async_t)); + uv_async_init(pThrd->loop, pThrd->cliAsync, clientAsyncCb); + pThrd->cliAsync->data = pThrd; + + pThrd->pTimer = malloc(sizeof(uv_timer_t)); + uv_timer_init(pThrd->loop, pThrd->pTimer); + pThrd->pTimer->data = pThrd; + + pThrd->pool = connPoolCreate(1); + return pThrd; +} static void destroyThrdObj(SCliThrdObj* pThrd) { + if (pThrd == NULL) { + return; + } pthread_join(pThrd->thread, NULL); pthread_mutex_destroy(&pThrd->msgMtx); free(pThrd->cliAsync); diff --git a/source/libs/transport/src/transSrv.c b/source/libs/transport/src/transSrv.c index 77b5f635f4..dafb809e2a 100644 --- a/source/libs/transport/src/transSrv.c +++ b/source/libs/transport/src/transSrv.c @@ -91,10 +91,14 @@ static SConn* connCreate(); static void connDestroy(SConn* conn); static void uvConnDestroy(uv_handle_t* handle); -// server worke thread +// server and worker thread static void* workerThread(void* arg); static void* acceptThread(void* arg); +// add handle loop +static bool addHandleToWorkloop(void* arg); +static bool addHandleToAcceptloop(void* arg); + void uvAllocReadBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { /* * formate of data buffer: @@ -268,7 +272,7 @@ static void uvProcessData(SConn* pConn) { rpcMsg.handle = pConn; (*(pRpc->cfp))(pRpc->parent, &rpcMsg, NULL); - uv_timer_start(pConn->pTimer, uvHandleActivityTimeout, pRpc->idleTime * 10000, 0); + // uv_timer_start(pConn->pTimer, uvHandleActivityTimeout, pRpc->idleTime * 10000, 0); // auth // validate msg type } @@ -451,22 +455,14 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) { void* acceptThread(void* arg) { // opt SServerObj* srv = (SServerObj*)arg; - uv_tcp_init(srv->loop, &srv->server); - - struct sockaddr_in bind_addr; - - uv_ip4_addr("0.0.0.0", srv->port, &bind_addr); - uv_tcp_bind(&srv->server, (const struct sockaddr*)&bind_addr, 0); - int err = 0; - if ((err = uv_listen((uv_stream_t*)&srv->server, 128, uvOnAcceptCb)) != 0) { - tError("Listen error %s\n", uv_err_name(err)); - return NULL; - } uv_run(srv->loop, UV_RUN_DEFAULT); } -static void initWorkThrdObj(SWorkThrdObj* pThrd) { +static bool addHandleToWorkloop(void* arg) { + SWorkThrdObj* pThrd = arg; pThrd->loop = (uv_loop_t*)malloc(sizeof(uv_loop_t)); - uv_loop_init(pThrd->loop); + if (0 != uv_loop_init(pThrd->loop)) { + return false; + } // SRpcInfo* pRpc = pThrd->shandle; uv_pipe_init(pThrd->loop, pThrd->pipe, 1); @@ -482,6 +478,31 @@ static void initWorkThrdObj(SWorkThrdObj* pThrd) { pThrd->workerAsync->data = pThrd; uv_read_start((uv_stream_t*)pThrd->pipe, uvAllocConnBufferCb, uvOnConnectionCb); + return true; +} + +static bool addHandleToAcceptloop(void* arg) { + // impl later + SServerObj* srv = arg; + + int err = 0; + if ((err = uv_tcp_init(srv->loop, &srv->server)) != 0) { + tError("failed to init accept server: %s", uv_err_name(err)); + return false; + } + + struct sockaddr_in bind_addr; + + uv_ip4_addr("0.0.0.0", srv->port, &bind_addr); + if ((err = uv_tcp_bind(&srv->server, (const struct sockaddr*)&bind_addr, 0)) != 0) { + tError("failed to bind: %s", uv_err_name(err)); + return false; + } + if ((err = uv_listen((uv_stream_t*)&srv->server, 128, uvOnAcceptCb)) != 0) { + tError("failed to listen: %s", uv_err_name(err)); + return false; + } + return true; } void* workerThread(void* arg) { SWorkThrdObj* pThrd = (SWorkThrdObj*)arg; @@ -546,11 +567,12 @@ void* taosInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, for (int i = 0; i < srv->numOfThreads; i++) { SWorkThrdObj* thrd = (SWorkThrdObj*)calloc(1, sizeof(SWorkThrdObj)); + srv->pThreadObj[i] = thrd; srv->pipe[i] = (uv_pipe_t*)calloc(2, sizeof(uv_pipe_t)); int fds[2]; if (uv_socketpair(AF_UNIX, SOCK_STREAM, fds, UV_NONBLOCK_PIPE, UV_NONBLOCK_PIPE) != 0) { - return NULL; + goto End; } uv_pipe_init(srv->loop, &(srv->pipe[i][0]), 1); uv_pipe_open(&(srv->pipe[i][0]), fds[1]); // init write @@ -559,7 +581,9 @@ void* taosInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, thrd->fd = fds[0]; thrd->pipe = &(srv->pipe[i][1]); // init read - initWorkThrdObj(thrd); + if (false == addHandleToWorkloop(thrd)) { + goto End; + } int err = pthread_create(&(thrd->thread), NULL, workerThread, (void*)(thrd)); if (err == 0) { tDebug("sucess to create worker-thread %d", i); @@ -568,9 +592,10 @@ void* taosInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, // TODO: clear all other resource later tError("failed to create worker-thread %d", i); } - srv->pThreadObj[i] = thrd; } - + if (false == addHandleToAcceptloop(srv)) { + goto End; + } int err = pthread_create(&srv->thread, NULL, acceptThread, (void*)srv); if (err == 0) { tDebug("success to create accept-thread"); @@ -579,16 +604,25 @@ void* taosInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, } return srv; +End: + taosCloseServer(srv); + return NULL; +} + +void destroyWorkThrd(SWorkThrdObj* pThrd) { + if (pThrd == NULL) { + return; + } + pthread_join(pThrd->thread, NULL); + // free(srv->pipe[i]); + free(pThrd->loop); + free(pThrd); } void taosCloseServer(void* arg) { // impl later SServerObj* srv = arg; for (int i = 0; i < srv->numOfThreads; i++) { - SWorkThrdObj* pThrd = srv->pThreadObj[i]; - pthread_join(pThrd->thread, NULL); - free(srv->pipe[i]); - free(pThrd->loop); - free(pThrd); + destroyWorkThrd(srv->pThreadObj[i]); } free(srv->loop); free(srv->pipe);