diff --git a/include/libs/planner/planner.h b/include/libs/planner/planner.h index f069b68286..ba3539e64e 100644 --- a/include/libs/planner/planner.h +++ b/include/libs/planner/planner.h @@ -114,10 +114,16 @@ typedef struct SProjectPhyNode { SPhyNode node; } SProjectPhyNode; +typedef struct SDownstreamSource { + SQueryNodeAddr addr; + uint64_t taskId; + uint64_t schedId; +} SDownstreamSource; + typedef struct SExchangePhyNode { SPhyNode node; - uint64_t srcTemplateId; // template id of datasource suplans - SArray *pSrcEndPoints; // SEpAddr, scheduler fill by calling qSetSuplanExecutionNode + uint64_t srcTemplateId; // template id of datasource suplans + SArray *pSrcEndPoints; // SArray, scheduler fill by calling qSetSuplanExecutionNode } SExchangePhyNode; typedef enum EAggAlgo { @@ -178,7 +184,7 @@ int32_t qCreateQueryDag(const struct SQueryNode* pQueryInfo, struct SQueryDag** // @subplan subplan to be schedule // @templateId templateId of a group of datasource subplans of this @subplan // @ep one execution location of this group of datasource subplans -void qSetSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SQueryNodeAddr* ep); +void qSetSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SDownstreamSource* pSource); int32_t qExplainQuery(const struct SQueryNode* pQueryInfo, struct SEpSet* pQnode, char** str); diff --git a/include/libs/scheduler/scheduler.h b/include/libs/scheduler/scheduler.h index 1f369067d6..3262b9437c 100644 --- a/include/libs/scheduler/scheduler.h +++ b/include/libs/scheduler/scheduler.h @@ -114,6 +114,14 @@ void schedulerDestroy(void); */ int32_t schedulerConvertDagToTaskList(SQueryDag* pDag, SArray **pTasks); +/** + * make one task info's multiple copies + * @param src + * @param dst SArray** + * @return + */ +int32_t schedulerCopyTask(STaskInfo *src, SArray **dst, int32_t copyNum); + void schedulerFreeTaskList(SArray *taskList); diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 93e38113ce..e93577e620 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -361,6 +361,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_QRY_TASK_DROPPING TAOS_DEF_ERROR_CODE(0, 0x0717) //"Task dropping") #define TSDB_CODE_QRY_DUPLICATTED_OPERATION TAOS_DEF_ERROR_CODE(0, 0x0718) //"Duplicatted operation") #define TSDB_CODE_QRY_TASK_MSG_ERROR TAOS_DEF_ERROR_CODE(0, 0x0719) //"Task message error") +#define TSDB_CODE_QRY_JOB_FREED TAOS_DEF_ERROR_CODE(0, 0x071A) //"Job freed") // grant #define TSDB_CODE_GRANT_EXPIRED TAOS_DEF_ERROR_CODE(0, 0x0800) //"License expired") diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index df0f060fbe..8fdde9f4e9 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -259,9 +259,9 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag) { SArray *execNode = taosArrayInit(4, sizeof(SQueryNodeAddr)); - SQueryNodeAddr addr = {.numOfEps = 1, .inUse = 0, .nodeId = 0}; + SQueryNodeAddr addr = {.numOfEps = 1, .inUse = 0, .nodeId = 1}; addr.epAddr[0].port = 6030; - strcpy(addr.epAddr[0].fqdn, "ubuntu"); + strcpy(addr.epAddr[0].fqdn, "localhost"); taosArrayPush(execNode, &addr); return scheduleAsyncExecJob(pRequest->pTscObj->pAppInfo->pTransporter, execNode, pDag, &pRequest->body.pQueryJob); diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 255b000d10..beb26572d0 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -375,9 +375,12 @@ typedef struct STaskParam { } STaskParam; typedef struct SExchangeInfo { - int32_t numOfSources; - SEpSet *pEpset; - int32_t bytes; // total load bytes from remote + SArray *pSources; + int32_t bytes; // total load bytes from remote + tsem_t ready; + void *pTransporter; + SRetrieveTableRsp *pRsp; + SSDataBlock *pResult; } SExchangeInfo; typedef struct STableScanInfo { @@ -545,7 +548,7 @@ typedef struct SOrderOperatorInfo { void appendUpstream(SOperatorInfo* p, SOperatorInfo* pUpstream); -SOperatorInfo* createExchangeOperatorInfo(const SVgroupInfo* pVgroups, int32_t numOfSources, int32_t numOfOutput, SExecTaskInfo* pTaskInfo); +SOperatorInfo* createExchangeOperatorInfo(const SArray* pSources, const SArray* pSchema, SExecTaskInfo* pTaskInfo); SOperatorInfo* createDataBlocksOptScanInfo(void* pTsdbReadHandle, int32_t order, int32_t numOfOutput, int32_t repeatTime, int32_t reverseTime, SExecTaskInfo* pTaskInfo); SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, int32_t numOfOutput, int32_t repeatTime, SExecTaskInfo* pTaskInfo); diff --git a/source/libs/executor/src/executorMain.c b/source/libs/executor/src/executorMain.c index df28972645..9fad5242ba 100644 --- a/source/libs/executor/src/executorMain.c +++ b/source/libs/executor/src/executorMain.c @@ -79,42 +79,43 @@ int32_t qCreateExecTask(void* tsdb, int32_t vgId, SSubplan* pSubplan, qTaskInfo_ int32_t tableType = 0; SPhyNode *pPhyNode = pSubplan->pNode; - if (pPhyNode->info.type == OP_TableScan || pPhyNode->info.type == OP_DataBlocksOptScan) { + STableGroupInfo groupInfo = {0}; + + int32_t type = pPhyNode->info.type; + if (type == OP_TableScan || type == OP_DataBlocksOptScan) { STableScanPhyNode* pTableScanNode = (STableScanPhyNode*)pPhyNode; - uid = pTableScanNode->scan.uid; - window = pTableScanNode->window; + uid = pTableScanNode->scan.uid; + window = pTableScanNode->window; tableType = pTableScanNode->scan.tableType; - } else { - assert(0); + + if (tableType == TSDB_SUPER_TABLE) { + code = + tsdbQuerySTableByTagCond(tsdb, uid, window.skey, NULL, 0, 0, NULL, &groupInfo, NULL, 0, pSubplan->id.queryId); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + } else { // Create one table group. + groupInfo.numOfTables = 1; + groupInfo.pGroupList = taosArrayInit(1, POINTER_BYTES); + + SArray* pa = taosArrayInit(1, sizeof(STableKeyInfo)); + + STableKeyInfo info = {.pTable = NULL, .lastKey = 0, .uid = uid}; + taosArrayPush(pa, &info); + taosArrayPush(groupInfo.pGroupList, &pa); + } + + if (groupInfo.numOfTables == 0) { + code = 0; + // qDebug("no table qualified for query, reqId:0x%"PRIx64, (*pTask)->id.queryId); + goto _error; + } } - STableGroupInfo groupInfo = {0}; - if (tableType == TSDB_SUPER_TABLE) { - code = tsdbQuerySTableByTagCond(tsdb, uid, window.skey, NULL, 0, 0, NULL, &groupInfo, NULL, 0, pSubplan->id.queryId); + code = doCreateExecTaskInfo(pSubplan, pTask, &groupInfo, tsdb); if (code != TSDB_CODE_SUCCESS) { goto _error; } - } else { // Create one table group. - groupInfo.numOfTables = 1; - groupInfo.pGroupList = taosArrayInit(1, POINTER_BYTES); - - SArray* pa = taosArrayInit(1, sizeof(STableKeyInfo)); - - STableKeyInfo info = {.pTable = NULL, .lastKey = 0, .uid = uid}; - taosArrayPush(pa, &info); - taosArrayPush(groupInfo.pGroupList, &pa); - } - - if (groupInfo.numOfTables == 0) { - code = 0; -// qDebug("no table qualified for query, reqId:0x%"PRIx64, (*pTask)->id.queryId); - goto _error; - } - - code = doCreateExecTaskInfo(pSubplan, pTask, &groupInfo, tsdb); - if (code != TSDB_CODE_SUCCESS) { - goto _error; - } SDataSinkMgtCfg cfg = {.maxDataBlockNum = 1000, .maxDataBlockNumPerQuery = 100}; code = dsDataSinkMgtInit(&cfg); @@ -182,7 +183,7 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) { // todo: remove it. if (tinfo == NULL) { - return NULL; + return TSDB_CODE_SUCCESS; } *pRes = NULL; diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 9daf3298e4..1b3c93ab61 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -4914,7 +4914,41 @@ static SSDataBlock* doBlockInfoScan(void* param, bool* newgroup) { } int32_t loadRemoteDataCallback(void* param, const SDataBuf* pMsg, int32_t code) { + SExchangeInfo* pEx = (SExchangeInfo*) param; + pEx->pRsp = pMsg->pData; + pEx->pRsp->numOfRows = htonl(pEx->pRsp->numOfRows); + pEx->pRsp->useconds = htobe64(pEx->pRsp->useconds); + pEx->pRsp->compLen = htonl(pEx->pRsp->compLen); + + tsem_post(&pEx->ready); +} + +static void destroySendMsgInfo(SMsgSendInfo* pMsgBody) { + assert(pMsgBody != NULL); + tfree(pMsgBody->msgInfo.pData); + tfree(pMsgBody); +} + +void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) { + SMsgSendInfo *pSendInfo = (SMsgSendInfo *) pMsg->ahandle; + assert(pMsg->ahandle != NULL); + + SDataBuf buf = {.len = pMsg->contLen, .pData = NULL}; + + if (pMsg->contLen > 0) { + buf.pData = calloc(1, pMsg->contLen); + if (buf.pData == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + pMsg->code = TSDB_CODE_OUT_OF_MEMORY; + } else { + memcpy(buf.pData, pMsg->pCont, pMsg->contLen); + } + } + + pSendInfo->fp(pSendInfo->param, &buf, pMsg->code); + rpcFreeCont(pMsg->pCont); + destroySendMsgInfo(pSendInfo); } static SSDataBlock* doLoadRemoteData(void* param, bool* newgroup) { @@ -4925,46 +4959,66 @@ static SSDataBlock* doLoadRemoteData(void* param, bool* newgroup) { *newgroup = false; - SResFetchReq *pMsg = calloc(1, sizeof(SResFetchReq)); - if (NULL == pMsg) { // todo handle malloc error - + if (pExchangeInfo->pRsp != NULL && pExchangeInfo->pRsp->completed == 1) { + return NULL; } - SEpSet epSet; + SResFetchReq *pMsg = calloc(1, sizeof(SResFetchReq)); + if (NULL == pMsg) { // todo handle malloc error + } - int64_t sId = -1, queryId = 0, taskId = 1, vgId = 1; - pMsg->header.vgId = htonl(vgId); + SDownstreamSource* pSource = taosArrayGet(pExchangeInfo->pSources, 0); + SEpSet epSet = {0}; - pMsg->sId = htobe64(sId); - pMsg->taskId = htobe64(taskId); - pMsg->queryId = htobe64(queryId); + epSet.numOfEps = pSource->addr.numOfEps; + epSet.port[0] = pSource->addr.epAddr[0].port; + tstrncpy(epSet.fqdn[0], pSource->addr.epAddr[0].fqdn, tListLen(epSet.fqdn[0])); + + pMsg->header.vgId = htonl(pSource->addr.nodeId); + pMsg->sId = htobe64(pSource->schedId); + pMsg->taskId = htobe64(pSource->taskId); + pMsg->queryId = htobe64(pTaskInfo->id.queryId); // send the fetch remote task result reques SMsgSendInfo* pMsgSendInfo = calloc(1, sizeof(SMsgSendInfo)); if (NULL == pMsgSendInfo) { - qError("QID:%"PRIx64 ",TID:%"PRIx64 " calloc %d failed", queryId, taskId, (int32_t)sizeof(SMsgSendInfo)); + qError("QID:%"PRIx64" calloc %d failed", GET_TASKID(pTaskInfo), (int32_t)sizeof(SMsgSendInfo)); } - pMsgSendInfo->param = NULL; + pMsgSendInfo->param = pExchangeInfo; pMsgSendInfo->msgInfo.pData = pMsg; pMsgSendInfo->msgInfo.len = sizeof(SResFetchReq); pMsgSendInfo->msgType = TDMT_VND_FETCH; pMsgSendInfo->fp = loadRemoteDataCallback; int64_t transporterId = 0; - void* pTransporter = NULL; - int32_t code = asyncSendMsgToServer(pTransporter, &epSet, &transporterId, pMsgSendInfo); + int32_t code = asyncSendMsgToServer(pExchangeInfo->pTransporter, &epSet, &transporterId, pMsgSendInfo); - printf("abc\n"); - getchar(); + tsem_wait(&pExchangeInfo->ready); - // add it into the sink node + if (pExchangeInfo->pRsp->numOfRows == 0) { + return NULL; + } + SSDataBlock* pRes = pExchangeInfo->pResult; + char* pData = pExchangeInfo->pRsp->data; + + for(int32_t i = 0; i < pOperator->numOfOutput; ++i) { + SColumnInfoData* pColInfoData = taosArrayGet(pRes->pDataBlock, i); + char* tmp = realloc(pColInfoData->pData, pColInfoData->info.bytes * pExchangeInfo->pRsp->numOfRows); + if (tmp == NULL) { + // todo + } + + size_t len = pExchangeInfo->pRsp->numOfRows * pColInfoData->info.bytes; + memcpy(tmp, pData, len); + pData += len; + } + + return pExchangeInfo->pResult; } -SOperatorInfo* createExchangeOperatorInfo(const SVgroupInfo* pVgroups, int32_t numOfSources, int32_t numOfOutput, SExecTaskInfo* pTaskInfo) { - assert(numOfSources > 0); - +SOperatorInfo* createExchangeOperatorInfo(const SArray* pSources, const SArray* pExprInfo, SExecTaskInfo* pTaskInfo) { SExchangeInfo* pInfo = calloc(1, sizeof(SExchangeInfo)); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); @@ -4975,18 +5029,57 @@ SOperatorInfo* createExchangeOperatorInfo(const SVgroupInfo* pVgroups, int32_t n return NULL; } - pInfo->numOfSources = numOfSources; + pInfo->pSources = taosArrayDup(pSources); + assert(taosArrayGetSize(pInfo->pSources) > 0); + + size_t size = taosArrayGetSize(pExprInfo); + pInfo->pResult = calloc(1, sizeof(SSDataBlock)); + pInfo->pResult->pDataBlock = taosArrayInit(pOperator->numOfOutput, sizeof(SColumnInfoData)); + + SArray* pResult = pInfo->pResult->pDataBlock; + for(int32_t i = 0; i < size; ++i) { + SColumnInfoData colInfoData = {0}; + SExprInfo* p = taosArrayGetP(pExprInfo, i); + + SSchema* pSchema = &p->base.resSchema; + colInfoData.info.type = pSchema->type; + colInfoData.info.colId = pSchema->colId; + colInfoData.info.bytes = pSchema->bytes; + + taosArrayPush(pResult, &colInfoData); + } pOperator->name = "ExchangeOperator"; pOperator->operatorType = OP_Exchange; pOperator->blockingOptr = false; pOperator->status = OP_IN_EXECUTING; pOperator->info = pInfo; - pOperator->numOfOutput = numOfOutput; + pOperator->numOfOutput = size; pOperator->pRuntimeEnv = NULL; pOperator->exec = doLoadRemoteData; pOperator->pTaskInfo = pTaskInfo; + { + SRpcInit rpcInit; + memset(&rpcInit, 0, sizeof(rpcInit)); + rpcInit.localPort = 0; + rpcInit.label = "TSC"; + rpcInit.numOfThreads = 1; + rpcInit.cfp = processMsgFromServer; + rpcInit.sessions = tsMaxConnections; + rpcInit.connType = TAOS_CONN_CLIENT; + rpcInit.user = (char *)"root"; + rpcInit.idleTime = tsShellActivityTimer * 1000; + rpcInit.ckey = "key"; +// rpcInit.spi = 1; + rpcInit.secret = (char *)"dcc5bed04851fec854c035b2e40263b6"; + + pInfo->pTransporter = rpcOpen(&rpcInit); + if (pInfo->pTransporter == NULL) { + return NULL; // todo + } + } + return pOperator; } @@ -5016,7 +5109,6 @@ SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, pOperator->status = OP_IN_EXECUTING; pOperator->info = pInfo; pOperator->numOfOutput = numOfOutput; - pOperator->pRuntimeEnv = NULL; pOperator->exec = doTableScan; pOperator->pTaskInfo = pTaskInfo; @@ -5049,7 +5141,6 @@ SOperatorInfo* createDataBlocksOptScanInfo(void* pTsdbReadHandle, int32_t order, pOperator->status = OP_IN_EXECUTING; pOperator->info = pInfo; pOperator->numOfOutput = numOfOutput; - pOperator->pRuntimeEnv = NULL; pOperator->exec = doTableScan; pOperator->pTaskInfo = pTaskInfo; @@ -7363,6 +7454,9 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhyNode* pPhyNode, SExecTaskInfo* pTask SScanPhyNode* pScanPhyNode = (SScanPhyNode*)pPhyNode; size_t numOfCols = taosArrayGetSize(pPhyNode->pTargets); return createDataBlocksOptScanInfo(param, pScanPhyNode->order, numOfCols, pScanPhyNode->count, pScanPhyNode->reverse, pTaskInfo); + } else if (pPhyNode->info.type == OP_Exchange) { + SExchangePhyNode* pEx = (SExchangePhyNode*) pPhyNode; + return createExchangeOperatorInfo(pEx->pSrcEndPoints, pEx->node.pTargets, pTaskInfo); } else { assert(0); } @@ -7372,32 +7466,35 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhyNode* pPhyNode, SExecTaskInfo* pTask int32_t doCreateExecTaskInfo(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, STableGroupInfo* pGroupInfo, void* readerHandle) { STsdbQueryCond cond = {.loadExternalRows = false}; + tsdbReadHandleT tsdbReadHandle = NULL; + SPhyNode* pPhyNode = pPlan->pNode; if (pPhyNode->info.type == OP_TableScan || pPhyNode->info.type == OP_DataBlocksOptScan) { - - STableScanPhyNode* pTableScanNode = (STableScanPhyNode*) pPhyNode; - cond.order = pTableScanNode->scan.order; + STableScanPhyNode* pTableScanNode = (STableScanPhyNode*)pPhyNode; + cond.order = pTableScanNode->scan.order; cond.numOfCols = taosArrayGetSize(pTableScanNode->scan.node.pTargets); - cond.colList = calloc(cond.numOfCols, sizeof(SColumnInfo)); - cond.twindow = pTableScanNode->window; - cond.type = BLOCK_LOAD_OFFSET_SEQ_ORDER; + cond.colList = calloc(cond.numOfCols, sizeof(SColumnInfo)); + cond.twindow = pTableScanNode->window; + cond.type = BLOCK_LOAD_OFFSET_SEQ_ORDER; - for(int32_t i = 0; i < cond.numOfCols; ++i) { + for (int32_t i = 0; i < cond.numOfCols; ++i) { SExprInfo* pExprInfo = taosArrayGetP(pTableScanNode->scan.node.pTargets, i); assert(pExprInfo->pExpr->nodeType == TEXPR_COL_NODE); SSchema* pSchema = pExprInfo->pExpr->pSchema; - cond.colList[i].type = pSchema->type; + cond.colList[i].type = pSchema->type; cond.colList[i].bytes = pSchema->bytes; cond.colList[i].colId = pSchema->colId; } + + *pTaskInfo = createExecTaskInfo((uint64_t) pPlan->id.queryId); + tsdbReadHandle = tsdbQueryTables(readerHandle, &cond, pGroupInfo, (*pTaskInfo)->id.queryId, NULL); + } else if (pPhyNode->info.type == OP_Exchange) { + *pTaskInfo = createExecTaskInfo((uint64_t) pPlan->id.queryId); } else { assert(0); } - *pTaskInfo = createExecTaskInfo((uint64_t)pPlan->id.queryId); - tsdbReadHandleT tsdbReadHandle = tsdbQueryTables(readerHandle, &cond, pGroupInfo, (*pTaskInfo)->id.queryId, NULL); - (*pTaskInfo)->pRoot = doCreateOperatorTreeNode(pPlan->pNode, *pTaskInfo, tsdbReadHandle); if ((*pTaskInfo)->pRoot == NULL) { return terrno; diff --git a/source/libs/planner/inc/plannerInt.h b/source/libs/planner/inc/plannerInt.h index be266bd415..4ff8364198 100644 --- a/source/libs/planner/inc/plannerInt.h +++ b/source/libs/planner/inc/plannerInt.h @@ -106,7 +106,7 @@ int32_t queryPlanToString(struct SQueryPlanNode* pQueryNode, char** str); int32_t queryPlanToSql(struct SQueryPlanNode* pQueryNode, char** sql); int32_t createDag(SQueryPlanNode* pQueryNode, struct SCatalog* pCatalog, SQueryDag** pDag, uint64_t requestId); -void setSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SQueryNodeAddr* ep); +void setSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SDownstreamSource* pSource); int32_t subPlanToString(const SSubplan *pPhyNode, char** str, int32_t* len); int32_t stringToSubplan(const char* str, SSubplan** subplan); diff --git a/source/libs/planner/src/logicPlan.c b/source/libs/planner/src/logicPlan.c index 93f72bba95..ae058c1f85 100644 --- a/source/libs/planner/src/logicPlan.c +++ b/source/libs/planner/src/logicPlan.c @@ -13,9 +13,9 @@ * along with this program. If not, see . */ -#include "function.h" #include "os.h" #include "parser.h" +#include "function.h" #include "plannerInt.h" typedef struct SFillEssInfo { diff --git a/source/libs/planner/src/physicalPlan.c b/source/libs/planner/src/physicalPlan.c index 422233eed7..1e1a97df1f 100644 --- a/source/libs/planner/src/physicalPlan.c +++ b/source/libs/planner/src/physicalPlan.c @@ -278,7 +278,7 @@ static uint64_t splitSubplanByTable(SPlanContext* pCxt, SQueryPlanNode* pPlanNod static SPhyNode* createExchangeNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode, uint64_t srcTemplateId) { SExchangePhyNode* node = (SExchangePhyNode*)initPhyNode(pPlanNode, OP_Exchange, sizeof(SExchangePhyNode)); node->srcTemplateId = srcTemplateId; - node->pSrcEndPoints = validPointer(taosArrayInit(TARRAY_MIN_SIZE, sizeof(SQueryNodeAddr))); + node->pSrcEndPoints = validPointer(taosArrayInit(TARRAY_MIN_SIZE, sizeof(SDownstreamSource))); return (SPhyNode*)node; } @@ -409,24 +409,25 @@ int32_t createDag(SQueryPlanNode* pQueryNode, struct SCatalog* pCatalog, SQueryD return TSDB_CODE_SUCCESS; } -void setExchangSourceNode(uint64_t templateId, SQueryNodeAddr* pEp, SPhyNode* pNode) { +void setExchangSourceNode(uint64_t templateId, SDownstreamSource *pSource, SPhyNode* pNode) { if (NULL == pNode) { return; } if (OP_Exchange == pNode->info.type) { SExchangePhyNode* pExchange = (SExchangePhyNode*)pNode; if (templateId == pExchange->srcTemplateId) { - taosArrayPush(pExchange->pSrcEndPoints, pEp); + taosArrayPush(pExchange->pSrcEndPoints, pSource); } } + if (pNode->pChildren != NULL) { size_t size = taosArrayGetSize(pNode->pChildren); for(int32_t i = 0; i < size; ++i) { - setExchangSourceNode(templateId, pEp, taosArrayGetP(pNode->pChildren, i)); + setExchangSourceNode(templateId, pSource, taosArrayGetP(pNode->pChildren, i)); } } } -void setSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SQueryNodeAddr* pEp) { - setExchangSourceNode(templateId, pEp, subplan->pNode); +void setSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SDownstreamSource* pSource) { + setExchangSourceNode(templateId, pSource, subplan->pNode); } diff --git a/source/libs/planner/src/physicalPlanJson.c b/source/libs/planner/src/physicalPlanJson.c index 9aeae55c4f..1b6cf89c56 100644 --- a/source/libs/planner/src/physicalPlanJson.c +++ b/source/libs/planner/src/physicalPlanJson.c @@ -729,40 +729,75 @@ static bool epAddrFromJson(const cJSON* json, void* obj) { return true; } -static const char* jkNodeAddrId = "NodeId"; -static const char* jkNodeAddrInUse = "InUse"; -static const char* jkNodeAddrEpAddrs = "EpAddrs"; +static const char* jkNodeAddrId = "NodeId"; +static const char* jkNodeAddrInUse = "InUse"; +static const char* jkNodeAddrEpAddrs = "Ep"; +static const char* jkNodeAddr = "NodeAddr"; +static const char* jkNodeTaskId = "TaskId"; +static const char* jkNodeTaskSchedId = "SchedId"; + +static bool queryNodeAddrToJson(const void* obj, cJSON* json) { + const SQueryNodeAddr* pAddr = (const SQueryNodeAddr*) obj; + bool res = cJSON_AddNumberToObject(json, jkNodeAddrId, pAddr->nodeId); + + if (res) { + res = cJSON_AddNumberToObject(json, jkNodeAddrInUse, pAddr->inUse); + } + + if (res) { + res = addRawArray(json, jkNodeAddrEpAddrs, epAddrToJson, pAddr->epAddr, sizeof(SEpAddr), pAddr->numOfEps); + } + return res; +} + +static bool queryNodeAddrFromJson(const cJSON* json, void* obj) { + SQueryNodeAddr* pAddr = (SQueryNodeAddr*) obj; + + pAddr->nodeId = getNumber(json, jkNodeAddrId); + pAddr->inUse = getNumber(json, jkNodeAddrInUse); + + int32_t numOfEps = 0; + bool res = fromRawArray(json, jkNodeAddrEpAddrs, epAddrFromJson, pAddr->epAddr, sizeof(SEpAddr), &numOfEps); + pAddr->numOfEps = numOfEps; + return res; +} static bool nodeAddrToJson(const void* obj, cJSON* json) { - const SQueryNodeAddr* ep = (const SQueryNodeAddr*)obj; - bool res = cJSON_AddNumberToObject(json, jkNodeAddrId, ep->nodeId); + const SDownstreamSource* pSource = (const SDownstreamSource*) obj; + bool res = cJSON_AddNumberToObject(json, jkNodeTaskId, pSource->taskId); + if (res) { - res = cJSON_AddNumberToObject(json, jkNodeAddrInUse, ep->inUse); + char t[30] = {0}; + snprintf(t, tListLen(t), "%"PRIu64, pSource->schedId); + res = cJSON_AddStringToObject(json, jkNodeTaskSchedId, t); } + if (res) { - res = addRawArray(json, jkNodeAddrEpAddrs, epAddrToJson, ep->epAddr, sizeof(SEpAddr), ep->numOfEps); + res = addObject(json, jkNodeAddr, queryNodeAddrToJson, &pSource->addr); } return res; } static bool nodeAddrFromJson(const cJSON* json, void* obj) { - SQueryNodeAddr* ep = (SQueryNodeAddr*)obj; - ep->nodeId = getNumber(json, jkNodeAddrId); - ep->inUse = getNumber(json, jkNodeAddrInUse); - int32_t numOfEps = 0; - bool res = fromRawArray(json, jkNodeAddrEpAddrs, nodeAddrFromJson, &ep->epAddr, sizeof(SEpAddr), &numOfEps); - ep->numOfEps = numOfEps; + SDownstreamSource* pSource = (SDownstreamSource*)obj; + pSource->taskId = getNumber(json, jkNodeTaskId); + + char* pSchedId = getString(json, jkNodeTaskSchedId); + pSource->schedId = strtoll(pSchedId, NULL, 10); + tfree(pSchedId); + + bool res = fromObject(json, jkNodeAddr, queryNodeAddrFromJson, &pSource->addr, true); return res; } static const char* jkExchangeNodeSrcTemplateId = "SrcTemplateId"; -static const char* jkExchangeNodeSrcEndPoints = "SrcEndPoints"; +static const char* jkExchangeNodeSrcEndPoints = "SrcAddrs"; static bool exchangeNodeToJson(const void* obj, cJSON* json) { const SExchangePhyNode* exchange = (const SExchangePhyNode*)obj; bool res = cJSON_AddNumberToObject(json, jkExchangeNodeSrcTemplateId, exchange->srcTemplateId); if (res) { - res = addInlineArray(json, jkExchangeNodeSrcEndPoints, nodeAddrToJson, exchange->pSrcEndPoints); + res = addRawArray(json, jkExchangeNodeSrcEndPoints, nodeAddrToJson, exchange->pSrcEndPoints->pData, sizeof(SDownstreamSource), taosArrayGetSize(exchange->pSrcEndPoints)); } return res; } @@ -770,7 +805,7 @@ static bool exchangeNodeToJson(const void* obj, cJSON* json) { static bool exchangeNodeFromJson(const cJSON* json, void* obj) { SExchangePhyNode* exchange = (SExchangePhyNode*)obj; exchange->srcTemplateId = getNumber(json, jkExchangeNodeSrcTemplateId); - return fromInlineArray(json, jkExchangeNodeSrcEndPoints, nodeAddrFromJson, &exchange->pSrcEndPoints, sizeof(SQueryNodeAddr)); + return fromInlineArray(json, jkExchangeNodeSrcEndPoints, nodeAddrFromJson, &exchange->pSrcEndPoints, sizeof(SDownstreamSource)); } static bool specificPhyNodeToJson(const void* obj, cJSON* json) { @@ -979,7 +1014,11 @@ static const char* jkIdSubplanId = "SubplanId"; static bool subplanIdToJson(const void* obj, cJSON* jId) { const SSubplanId* id = (const SSubplanId*)obj; - bool res = cJSON_AddNumberToObject(jId, jkIdQueryId, id->queryId); + + char ids[40] = {0}; + snprintf(ids, tListLen(ids), "%"PRIu64, id->queryId); + + bool res = cJSON_AddStringToObject(jId, jkIdQueryId, ids); if (res) { res = cJSON_AddNumberToObject(jId, jkIdTemplateId, id->templateId); } @@ -991,7 +1030,11 @@ static bool subplanIdToJson(const void* obj, cJSON* jId) { static bool subplanIdFromJson(const cJSON* json, void* obj) { SSubplanId* id = (SSubplanId*)obj; - id->queryId = getNumber(json, jkIdQueryId); + + char* queryId = getString(json, jkIdQueryId); + id->queryId = strtoll(queryId, NULL, 0); + tfree(queryId); + id->templateId = getNumber(json, jkIdTemplateId); id->subplanId = getNumber(json, jkIdSubplanId); return true; diff --git a/source/libs/planner/src/planner.c b/source/libs/planner/src/planner.c index bf815b26b2..6b3f37741e 100644 --- a/source/libs/planner/src/planner.c +++ b/source/libs/planner/src/planner.c @@ -87,8 +87,8 @@ int32_t qCreateQueryDag(const struct SQueryNode* pNode, struct SQueryDag** pDag, return TSDB_CODE_SUCCESS; } -void qSetSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SQueryNodeAddr* ep) { - setSubplanExecutionNode(subplan, templateId, ep); +void qSetSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SDownstreamSource* pSource) { + setSubplanExecutionNode(subplan, templateId, pSource); } int32_t qSubPlanToString(const SSubplan *subplan, char** str, int32_t* len) { diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index ffb602bd36..45c6936b62 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -20,12 +20,38 @@ static SSchedulerMgmt schMgmt = {0}; +uint64_t schGenTaskId(void) { + return atomic_add_fetch_64(&schMgmt.taskId, 1); +} + +uint64_t schGenUUID(void) { + static uint64_t hashId = 0; + static int32_t requestSerialId = 0; + + if (hashId == 0) { + char uid[64]; + int32_t code = taosGetSystemUUID(uid, tListLen(uid)); + if (code != TSDB_CODE_SUCCESS) { + qError("Failed to get the system uid, reason:%s", tstrerror(TAOS_SYSTEM_ERROR(errno))); + } else { + hashId = MurmurHash3_32(uid, strlen(uid)); + } + } + + int64_t ts = taosGetTimestampMs(); + uint64_t pid = taosGetPId(); + int32_t val = atomic_add_fetch_32(&requestSerialId, 1); + + uint64_t id = ((hashId & 0x0FFF) << 52) | ((pid & 0x0FFF) << 40) | ((ts & 0xFFFFFF) << 16) | (val & 0xFFFF); + return id; +} + int32_t schInitTask(SSchJob* pJob, SSchTask *pTask, SSubplan* pPlan, SSchLevel *pLevel) { pTask->plan = pPlan; pTask->level = pLevel; SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_NOT_START); - pTask->taskId = atomic_add_fetch_64(&schMgmt.taskId, 1); + pTask->taskId = schGenTaskId(); pTask->execAddrs = taosArrayInit(SCH_MAX_CANDIDATE_EP_NUM, sizeof(SQueryNodeAddr)); if (NULL == pTask->execAddrs) { SCH_TASK_ELOG("taosArrayInit %d exec addrs failed", SCH_MAX_CANDIDATE_EP_NUM); @@ -40,8 +66,7 @@ void schFreeTask(SSchTask* pTask) { taosArrayDestroy(pTask->candidateAddrs); } - // TODO NEED TO VERFY WITH ASYNC_SEND MEMORY FREE - //tfree(pTask->msg); + tfree(pTask->msg); if (pTask->children) { taosArrayDestroy(pTask->children); @@ -71,7 +96,7 @@ int32_t schValidateTaskReceivedMsgType(SSchJob *pJob, SSchTask *pTask, int32_t m SCH_TASK_ELOG("rsp msg type mis-match, last sent msgType:%d, rspType:%d", lastMsgType, msgType); SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR); } - + if (SCH_GET_TASK_STATUS(pTask) != JOB_TASK_STATUS_EXECUTING && SCH_GET_TASK_STATUS(pTask) != JOB_TASK_STATUS_PARTIAL_SUCCEED) { SCH_TASK_ELOG("rsp msg conflicted with task status, status:%d, rspType:%d", SCH_GET_TASK_STATUS(pTask), msgType); SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR); @@ -141,7 +166,7 @@ int32_t schCheckAndUpdateJobStatus(SSchJob *pJob, int8_t newStatus) { break; case JOB_TASK_STATUS_CANCELLED: case JOB_TASK_STATUS_DROPPING: - SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); + SCH_ERR_JRET(TSDB_CODE_QRY_JOB_FREED); break; default: @@ -541,12 +566,9 @@ int32_t schTaskCheckAndSetRetry(SSchJob *job, SSchTask *task, int32_t errCode, b return TSDB_CODE_SUCCESS; } - - -// Note: no more error processing, handled in function internal -int32_t schProcessOnJobFailure(SSchJob *pJob, int32_t errCode) { +int32_t schProcessOnJobFailureImpl(SSchJob *pJob, int32_t status, int32_t errCode) { // if already FAILED, no more processing - SCH_ERR_RET(schCheckAndUpdateJobStatus(pJob, JOB_TASK_STATUS_FAILED)); + SCH_ERR_RET(schCheckAndUpdateJobStatus(pJob, status)); if (errCode) { atomic_store_32(&pJob->errCode, errCode); @@ -563,6 +585,17 @@ int32_t schProcessOnJobFailure(SSchJob *pJob, int32_t errCode) { +// Note: no more error processing, handled in function internal +int32_t schProcessOnJobFailure(SSchJob *pJob, int32_t errCode) { + SCH_RET(schProcessOnJobFailureImpl(pJob, JOB_TASK_STATUS_FAILED, errCode)); +} + +// Note: no more error processing, handled in function internal +int32_t schProcessOnJobDropped(SSchJob *pJob, int32_t errCode) { + SCH_RET(schProcessOnJobFailureImpl(pJob, JOB_TASK_STATUS_DROPPING, errCode)); +} + + // Note: no more error processing, handled in function internal int32_t schFetchFromRemote(SSchJob *pJob) { int32_t code = 0; @@ -749,7 +782,8 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) { atomic_add_fetch_32(&par->childReady, 1); SCH_LOCK(SCH_WRITE, &par->lock); - qSetSubplanExecutionNode(par->plan, pTask->plan->id.templateId, &pTask->succeedAddr); + SDownstreamSource source = {.taskId = pTask->taskId, .schedId = schMgmt.sId, .addr = pTask->succeedAddr}; + qSetSubplanExecutionNode(par->plan, pTask->plan->id.templateId, &source); SCH_UNLOCK(SCH_WRITE, &par->lock); if (SCH_TASK_READY_TO_LUNCH(par)) { @@ -820,7 +854,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch SResReadyRsp *rsp = (SResReadyRsp *)msg; if (rspCode != TSDB_CODE_SUCCESS || NULL == msg || rsp->code != TSDB_CODE_SUCCESS) { - SCH_ERR_RET(schProcessOnTaskFailure(pJob, pTask, rsp->code)); + SCH_ERR_RET(schProcessOnTaskFailure(pJob, pTask, rspCode)); } SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask)); @@ -834,9 +868,19 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch SCH_ERR_RET(schProcessOnTaskFailure(pJob, pTask, rspCode)); } + if (pJob->res) { + SCH_TASK_ELOG("got fetch rsp while res already exists, res:%p", pJob->res); + tfree(rsp); + SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR); + } + atomic_store_ptr(&pJob->res, rsp); atomic_store_32(&pJob->resNumOfRows, rsp->numOfRows); - + + if (rsp->completed) { + SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_SUCCEED); + } + SCH_ERR_JRET(schProcessOnDataFetched(pJob)); break; @@ -871,7 +915,7 @@ int32_t schHandleCallback(void* param, const SDataBuf* pMsg, int32_t msgType, in SSchJob **job = taosHashGet(schMgmt.jobs, &pParam->queryId, sizeof(pParam->queryId)); if (NULL == job || NULL == (*job)) { qError("QID:%"PRIx64" taosHashGet queryId not exist, may be dropped", pParam->queryId); - SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR); + SCH_ERR_JRET(TSDB_CODE_QRY_JOB_FREED); } pJob = *job; @@ -880,13 +924,13 @@ int32_t schHandleCallback(void* param, const SDataBuf* pMsg, int32_t msgType, in int32_t s = taosHashGetSize(pJob->execTasks); if (s <= 0) { - qError("QID:%"PRIx64",TID:%"PRIx64" no task in execTask list", pParam->queryId, pParam->taskId); + qError("QID:%"PRIx64",TID:%"PRId64" no task in execTask list", pParam->queryId, pParam->taskId); SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR); } SSchTask **task = taosHashGet(pJob->execTasks, &pParam->taskId, sizeof(pParam->taskId)); if (NULL == task || NULL == (*task)) { - qError("QID:%"PRIx64",TID:%"PRIx64" taosHashGet taskId not exist", pParam->queryId, pParam->taskId); + qError("QID:%"PRIx64",TID:%"PRId64" taosHashGet taskId not exist", pParam->queryId, pParam->taskId); SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR); } @@ -1033,7 +1077,13 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, case TDMT_VND_CREATE_TABLE: case TDMT_VND_SUBMIT: { msgSize = pTask->msgLen; - msg = pTask->msg; + msg = calloc(1, msgSize); + if (NULL == msg) { + SCH_TASK_ELOG("calloc %d failed", msgSize); + SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + + memcpy(msg, pTask->msg, msgSize); break; } @@ -1430,13 +1480,13 @@ int32_t schedulerConvertDagToTaskList(SQueryDag* pDag, SArray **pTasks) { SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); } - SSubQueryMsg *pMsg = msg; + SSubQueryMsg *pMsg = (SSubQueryMsg*) msg; pMsg->header.vgId = htonl(tInfo.addr.nodeId); pMsg->sId = htobe64(schMgmt.sId); pMsg->queryId = htobe64(plan->id.queryId); - pMsg->taskId = htobe64(atomic_add_fetch_64(&schMgmt.taskId, 1)); + pMsg->taskId = htobe64(schGenUUID()); pMsg->contentLen = htonl(msgLen); memcpy(pMsg->msg, msg, msgLen); @@ -1459,6 +1509,52 @@ _return: SCH_RET(code); } +int32_t schedulerCopyTask(STaskInfo *src, SArray **dst, int32_t copyNum) { + if (NULL == src || NULL == dst || copyNum <= 0) { + SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); + } + + int32_t code = 0; + + *dst = taosArrayInit(copyNum, sizeof(STaskInfo)); + if (NULL == *dst) { + qError("taosArrayInit %d taskInfo failed", copyNum); + SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + + int32_t msgSize = src->msg->contentLen + sizeof(*src->msg); + STaskInfo info = {0}; + + info.addr = src->addr; + + for (int32_t i = 0; i < copyNum; ++i) { + info.msg = malloc(msgSize); + if (NULL == info.msg) { + qError("malloc %d failed", msgSize); + SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + + memcpy(info.msg, src->msg, msgSize); + + info.msg->taskId = schGenUUID(); + + if (NULL == taosArrayPush(*dst, &info)) { + qError("taosArrayPush failed, idx:%d", i); + free(info.msg); + SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + } + + return TSDB_CODE_SUCCESS; + +_return: + + schedulerFreeTaskList(*dst); + *dst = NULL; + + SCH_RET(code); +} + int32_t scheduleFetchRows(SSchJob *pJob, void** pData) { if (NULL == pJob || NULL == pData) { @@ -1466,33 +1562,29 @@ int32_t scheduleFetchRows(SSchJob *pJob, void** pData) { } int32_t code = 0; + atomic_add_fetch_32(&pJob->ref, 1); + int8_t status = SCH_GET_JOB_STATUS(pJob); if (status == JOB_TASK_STATUS_DROPPING) { SCH_JOB_ELOG("job is dropping, status:%d", status); - return TSDB_CODE_SCH_STATUS_ERROR; + SCH_ERR_JRET(TSDB_CODE_SCH_STATUS_ERROR); } - atomic_add_fetch_32(&pJob->ref, 1); - if (!SCH_JOB_NEED_FETCH(&pJob->attr)) { SCH_JOB_ELOG("no need to fetch data, status:%d", SCH_GET_JOB_STATUS(pJob)); - atomic_sub_fetch_32(&pJob->ref, 1); - SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR); + SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); } if (atomic_val_compare_exchange_8(&pJob->userFetch, 0, 1) != 0) { SCH_JOB_ELOG("prior fetching not finished, userFetch:%d", atomic_load_8(&pJob->userFetch)); - atomic_sub_fetch_32(&pJob->ref, 1); - SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR); + SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); } - if (status == JOB_TASK_STATUS_FAILED) { - *pData = atomic_load_ptr(&pJob->res); - atomic_store_ptr(&pJob->res, NULL); + if (JOB_TASK_STATUS_FAILED == status || JOB_TASK_STATUS_DROPPING == status) { + SCH_JOB_ELOG("job failed or dropping, status:%d", status); SCH_ERR_JRET(atomic_load_32(&pJob->errCode)); } else if (status == JOB_TASK_STATUS_SUCCEED) { - *pData = atomic_load_ptr(&pJob->res); - atomic_store_ptr(&pJob->res, NULL); + SCH_JOB_ELOG("job already succeed, status:%d", status); goto _return; } else if (status == JOB_TASK_STATUS_PARTIAL_SUCCEED) { SCH_ERR_JRET(schFetchFromRemote(pJob)); @@ -1502,15 +1594,17 @@ int32_t scheduleFetchRows(SSchJob *pJob, void** pData) { status = SCH_GET_JOB_STATUS(pJob); - if (status == JOB_TASK_STATUS_FAILED) { - code = atomic_load_32(&pJob->errCode); - SCH_ERR_JRET(code); + if (JOB_TASK_STATUS_FAILED == status || JOB_TASK_STATUS_DROPPING == status) { + SCH_JOB_ELOG("job failed or dropping, status:%d", status); + SCH_ERR_JRET(atomic_load_32(&pJob->errCode)); } if (pJob->res && ((SRetrieveTableRsp *)pJob->res)->completed) { SCH_ERR_JRET(schCheckAndUpdateJobStatus(pJob, JOB_TASK_STATUS_SUCCEED)); } +_return: + while (true) { *pData = atomic_load_ptr(&pJob->res); @@ -1521,10 +1615,19 @@ int32_t scheduleFetchRows(SSchJob *pJob, void** pData) { break; } -_return: + if (NULL == *pData) { + SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)calloc(1, sizeof(SRetrieveTableRsp)); + if (rsp) { + rsp->completed = 1; + } + + *pData = rsp; + } atomic_val_compare_exchange_8(&pJob->userFetch, 1, 0); + SCH_JOB_DLOG("fetch done, code:%x", code); + atomic_sub_fetch_32(&pJob->ref, 1); SCH_RET(code); @@ -1549,6 +1652,7 @@ void scheduleFreeJob(void *job) { SSchJob *pJob = job; uint64_t queryId = pJob->queryId; + bool setJobFree = false; if (SCH_GET_JOB_STATUS(pJob) > 0) { if (0 != taosHashRemove(schMgmt.jobs, &pJob->queryId, sizeof(pJob->queryId))) { @@ -1556,8 +1660,6 @@ void scheduleFreeJob(void *job) { return; } - schCheckAndUpdateJobStatus(pJob, JOB_TASK_STATUS_DROPPING); - SCH_JOB_DLOG("job removed from list, no further ref, ref:%d", atomic_load_32(&pJob->ref)); while (true) { @@ -1565,6 +1667,11 @@ void scheduleFreeJob(void *job) { if (0 == ref) { break; } else if (ref > 0) { + if (1 == ref && atomic_load_8(&pJob->userFetch) > 0 && !setJobFree) { + schProcessOnJobDropped(pJob, TSDB_CODE_QRY_JOB_FREED); + setJobFree = true; + } + usleep(1); } else { assert(0); @@ -1600,6 +1707,7 @@ void scheduleFreeJob(void *job) { taosHashCleanup(pJob->succTasks); taosArrayDestroy(pJob->levels); + taosArrayDestroy(pJob->nodeList); tfree(pJob->res); diff --git a/source/libs/scheduler/test/schedulerTests.cpp b/source/libs/scheduler/test/schedulerTests.cpp index 5332c6fcd1..1425ac0e6c 100644 --- a/source/libs/scheduler/test/schedulerTests.cpp +++ b/source/libs/scheduler/test/schedulerTests.cpp @@ -38,6 +38,20 @@ namespace { extern "C" int32_t schHandleResponseMsg(SSchJob *job, SSchTask *task, int32_t msgType, char *msg, int32_t msgSize, int32_t rspCode); +extern "C" int32_t schHandleCallback(void* param, const SDataBuf* pMsg, int32_t msgType, int32_t rspCode); + +struct SSchJob *pInsertJob = NULL; +struct SSchJob *pQueryJob = NULL; + +uint64_t schtMergeTemplateId = 0x4; +uint64_t schtFetchTaskId = 0; +uint64_t schtQueryId = 1; + +bool schtTestStop = false; +bool schtTestDeadLoop = false; +int32_t schtTestMTRunSec = 10; +int32_t schtTestPrintNum = 1000; +int32_t schtStartFetch = 0; void schtInitLogFile() { @@ -57,7 +71,7 @@ void schtInitLogFile() { void schtBuildQueryDag(SQueryDag *dag) { - uint64_t qId = 0x0000000000000001; + uint64_t qId = schtQueryId; dag->queryId = qId; dag->numOfSubplans = 2; @@ -84,7 +98,7 @@ void schtBuildQueryDag(SQueryDag *dag) { scanPlan->msgType = TDMT_VND_QUERY; mergePlan->id.queryId = qId; - mergePlan->id.templateId = 0x4444444444; + mergePlan->id.templateId = schtMergeTemplateId; mergePlan->id.subplanId = 0x5555555555; mergePlan->type = QUERY_TYPE_MERGE; mergePlan->level = 0; @@ -173,8 +187,6 @@ void schtRpcSendRequest(void *shandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int6 } - - void schtSetPlanToString() { static Stub stub; stub.set(qSubPlanToString, schtPlanToString); @@ -214,7 +226,12 @@ void schtSetRpcSendRequest() { } } -int32_t schtAsyncSendMsgToServer(void *pTransporter, SEpSet* epSet, int64_t* pTransporterId, const SMsgSendInfo* pInfo) { +int32_t schtAsyncSendMsgToServer(void *pTransporter, SEpSet* epSet, int64_t* pTransporterId, SMsgSendInfo* pInfo) { + if (pInfo) { + tfree(pInfo->param); + tfree(pInfo->msgInfo.pData); + free(pInfo); + } return 0; } @@ -269,15 +286,224 @@ void *schtCreateFetchRspThread(void *param) { SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)calloc(1, sizeof(SRetrieveTableRsp)); rsp->completed = 1; rsp->numOfRows = 10; - code = schHandleResponseMsg(job, job->fetchTask, TDMT_VND_FETCH_RSP, (char *)rsp, sizeof(rsp), 0); + + code = schHandleResponseMsg(job, job->fetchTask, TDMT_VND_FETCH_RSP, (char *)rsp, sizeof(*rsp), 0); assert(code == 0); } +void *schtFetchRspThread(void *aa) { + SDataBuf dataBuf = {0}; + SSchCallbackParam* param = NULL; + + while (!schtTestStop) { + if (0 == atomic_val_compare_exchange_32(&schtStartFetch, 1, 0)) { + continue; + } + + usleep(1); + + param = (SSchCallbackParam *)calloc(1, sizeof(*param)); + + param->queryId = schtQueryId; + param->taskId = schtFetchTaskId; + + int32_t code = 0; + SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)calloc(1, sizeof(SRetrieveTableRsp)); + rsp->completed = 1; + rsp->numOfRows = 10; + + dataBuf.pData = rsp; + dataBuf.len = sizeof(*rsp); + + code = schHandleCallback(param, &dataBuf, TDMT_VND_FETCH_RSP, 0); + + assert(code == 0 || code); + } +} + +void schtFreeQueryJob(int32_t freeThread) { + static uint32_t freeNum = 0; + SSchJob *job = atomic_load_ptr(&pQueryJob); + + if (job && atomic_val_compare_exchange_ptr(&pQueryJob, job, NULL)) { + scheduleFreeJob(job); + if (freeThread) { + if (++freeNum % schtTestPrintNum == 0) { + printf("FreeNum:%d\n", freeNum); + } + } + } +} + +void* schtRunJobThread(void *aa) { + void *mockPointer = (void *)0x1; + char *clusterId = "cluster1"; + char *dbname = "1.db1"; + char *tablename = "table1"; + SVgroupInfo vgInfo = {0}; + SQueryDag dag = {0}; + + schtInitLogFile(); + + + int32_t code = schedulerInit(NULL); + assert(code == 0); + + + schtSetPlanToString(); + schtSetExecNode(); + schtSetAsyncSendMsgToServer(); + + SSchJob *job = NULL; + SSchCallbackParam *param = NULL; + SHashObj *execTasks = NULL; + SDataBuf dataBuf = {0}; + uint32_t jobFinished = 0; + + while (!schtTestStop) { + schtBuildQueryDag(&dag); + + SArray *qnodeList = taosArrayInit(1, sizeof(SEpAddr)); + + SEpAddr qnodeAddr = {0}; + strcpy(qnodeAddr.fqdn, "qnode0.ep"); + qnodeAddr.port = 6031; + taosArrayPush(qnodeList, &qnodeAddr); + + code = scheduleAsyncExecJob(mockPointer, qnodeList, &dag, &job); + assert(code == 0); + + execTasks = taosHashInit(5, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK); + void *pIter = taosHashIterate(job->execTasks, NULL); + while (pIter) { + SSchTask *task = *(SSchTask **)pIter; + schtFetchTaskId = task->taskId - 1; + + taosHashPut(execTasks, &task->taskId, sizeof(task->taskId), task, sizeof(*task)); + pIter = taosHashIterate(job->execTasks, pIter); + } + + param = (SSchCallbackParam *)calloc(1, sizeof(*param)); + param->queryId = schtQueryId; + + pQueryJob = job; + + + pIter = taosHashIterate(execTasks, NULL); + while (pIter) { + SSchTask *task = (SSchTask *)pIter; + + param->taskId = task->taskId; + SQueryTableRsp rsp = {0}; + dataBuf.pData = &rsp; + dataBuf.len = sizeof(rsp); + + code = schHandleCallback(param, &dataBuf, TDMT_VND_QUERY_RSP, 0); + assert(code == 0 || code); + + pIter = taosHashIterate(execTasks, pIter); + } + + + param = (SSchCallbackParam *)calloc(1, sizeof(*param)); + param->queryId = schtQueryId; + + pIter = taosHashIterate(execTasks, NULL); + while (pIter) { + SSchTask *task = (SSchTask *)pIter; + + param->taskId = task->taskId; + SResReadyRsp rsp = {0}; + dataBuf.pData = &rsp; + dataBuf.len = sizeof(rsp); + + code = schHandleCallback(param, &dataBuf, TDMT_VND_RES_READY_RSP, 0); + assert(code == 0 || code); + + pIter = taosHashIterate(execTasks, pIter); + } + + + param = (SSchCallbackParam *)calloc(1, sizeof(*param)); + param->queryId = schtQueryId; + + pIter = taosHashIterate(execTasks, NULL); + while (pIter) { + SSchTask *task = (SSchTask *)pIter; + + param->taskId = task->taskId - 1; + SQueryTableRsp rsp = {0}; + dataBuf.pData = &rsp; + dataBuf.len = sizeof(rsp); + + code = schHandleCallback(param, &dataBuf, TDMT_VND_QUERY_RSP, 0); + assert(code == 0 || code); + + pIter = taosHashIterate(execTasks, pIter); + } + + + param = (SSchCallbackParam *)calloc(1, sizeof(*param)); + param->queryId = schtQueryId; + + pIter = taosHashIterate(execTasks, NULL); + while (pIter) { + SSchTask *task = (SSchTask *)pIter; + + param->taskId = task->taskId - 1; + SResReadyRsp rsp = {0}; + dataBuf.pData = &rsp; + dataBuf.len = sizeof(rsp); + + code = schHandleCallback(param, &dataBuf, TDMT_VND_RES_READY_RSP, 0); + assert(code == 0 || code); + + pIter = taosHashIterate(execTasks, pIter); + } + + atomic_store_32(&schtStartFetch, 1); + + void *data = NULL; + code = scheduleFetchRows(pQueryJob, &data); + assert(code == 0 || code); + + if (0 == code) { + SRetrieveTableRsp *pRsp = (SRetrieveTableRsp *)data; + assert(pRsp->completed == 1); + assert(pRsp->numOfRows == 10); + } + + data = NULL; + code = scheduleFetchRows(pQueryJob, &data); + assert(code == 0 || code); + + schtFreeQueryJob(0); + + taosHashCleanup(execTasks); + + schtFreeQueryDag(&dag); + + if (++jobFinished % schtTestPrintNum == 0) { + printf("jobFinished:%d\n", jobFinished); + } + + ++schtQueryId; + } + + schedulerDestroy(); + +} + +void* schtFreeJobThread(void *aa) { + while (!schtTestStop) { + usleep(rand() % 100); + schtFreeQueryJob(1); + } +} -struct SSchJob *pInsertJob = NULL; } TEST(queryTest, normalCase) { @@ -368,11 +594,12 @@ TEST(queryTest, normalCase) { SRetrieveTableRsp *pRsp = (SRetrieveTableRsp *)data; ASSERT_EQ(pRsp->completed, 1); ASSERT_EQ(pRsp->numOfRows, 10); + tfree(data); data = NULL; code = scheduleFetchRows(job, &data); ASSERT_EQ(code, 0); - ASSERT_EQ(data, (void*)NULL); + ASSERT_TRUE(data); scheduleFreeJob(pJob); @@ -383,7 +610,6 @@ TEST(queryTest, normalCase) { - TEST(insertTest, normalCase) { void *mockPointer = (void *)0x1; char *clusterId = "cluster1"; @@ -427,13 +653,29 @@ TEST(insertTest, normalCase) { } TEST(multiThread, forceFree) { + pthread_attr_t thattr; + pthread_attr_init(&thattr); - schtInitLogFile(); + pthread_t thread1, thread2, thread3; + pthread_create(&(thread1), &thattr, schtRunJobThread, NULL); + pthread_create(&(thread2), &thattr, schtFreeJobThread, NULL); + pthread_create(&(thread3), &thattr, schtFetchRspThread, NULL); + while (true) { + if (schtTestDeadLoop) { + sleep(1); + } else { + sleep(schtTestMTRunSec); + break; + } + } + + schtTestStop = true; + sleep(3); } - int main(int argc, char** argv) { + srand(time(NULL)); testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS(); } diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h new file mode 100644 index 0000000000..792200639a --- /dev/null +++ b/source/libs/transport/inc/transComm.h @@ -0,0 +1,120 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +#ifdef USE_UV + +#include +#include "lz4.h" +#include "os.h" +#include "rpcCache.h" +#include "rpcHead.h" +#include "rpcLog.h" +#include "rpcTcp.h" +#include "rpcUdp.h" +#include "taoserror.h" +#include "tglobal.h" +#include "thash.h" +#include "tidpool.h" +#include "tmd5.h" +#include "tmempool.h" +#include "tmsg.h" +#include "transportInt.h" +#include "tref.h" +#include "trpc.h" +#include "ttimer.h" +#include "tutil.h" + +typedef void* queue[2]; +/* Private macros. */ +#define QUEUE_NEXT(q) (*(queue**)&((*(q))[0])) +#define QUEUE_PREV(q) (*(queue**)&((*(q))[1])) + +#define QUEUE_PREV_NEXT(q) (QUEUE_NEXT(QUEUE_PREV(q))) +#define QUEUE_NEXT_PREV(q) (QUEUE_PREV(QUEUE_NEXT(q))) +/* Initialize an empty queue. */ +#define QUEUE_INIT(q) \ + { \ + QUEUE_NEXT(q) = (q); \ + QUEUE_PREV(q) = (q); \ + } + +/* Return true if the queue has no element. */ +#define QUEUE_IS_EMPTY(q) ((const queue*)(q) == (const queue*)QUEUE_NEXT(q)) + +/* Insert an element at the back of a queue. */ +#define QUEUE_PUSH(q, e) \ + { \ + QUEUE_NEXT(e) = (q); \ + QUEUE_PREV(e) = QUEUE_PREV(q); \ + QUEUE_PREV_NEXT(e) = (e); \ + QUEUE_PREV(q) = (e); \ + } + +/* Remove the given element from the queue. Any element can be removed at any * + * time. */ +#define QUEUE_REMOVE(e) \ + { \ + QUEUE_PREV_NEXT(e) = QUEUE_NEXT(e); \ + QUEUE_NEXT_PREV(e) = QUEUE_PREV(e); \ + } + +/* Return the element at the front of the queue. */ +#define QUEUE_HEAD(q) (QUEUE_NEXT(q)) + +/* Return the element at the back of the queue. */ +#define QUEUE_TAIL(q) (QUEUE_PREV(q)) + +/* Iterate over the element of a queue. * Mutating the queue while iterating + * results in undefined behavior. */ +#define QUEUE_FOREACH(q, e) for ((q) = QUEUE_NEXT(e); (q) != (e); (q) = QUEUE_NEXT(q)) + +/* Return the structure holding the given element. */ +#define QUEUE_DATA(e, type, field) ((type*)((void*)((char*)(e)-offsetof(type, field)))) + +typedef struct { + SRpcInfo* pRpc; // associated SRpcInfo + SEpSet epSet; // ip list provided by app + void* ahandle; // handle provided by app + struct SRpcConn* pConn; // pConn allocated + tmsg_t msgType; // message type + uint8_t* pCont; // content provided by app + int32_t contLen; // content length + int32_t code; // error code + int16_t numOfTry; // number of try for different servers + int8_t oldInUse; // server EP inUse passed by app + int8_t redirect; // flag to indicate redirect + int8_t connType; // connection type + int64_t rid; // refId returned by taosAddRef + SRpcMsg* pRsp; // for synchronous API + tsem_t* pSem; // for synchronous API + SEpSet* pSet; // for synchronous API + char msg[0]; // RpcHead starts from here +} SRpcReqContext; + +#define container_of(ptr, type, member) ((type*)((char*)(ptr)-offsetof(type, member))) +#define RPC_RESERVE_SIZE (sizeof(SRpcReqContext)) + +#define RPC_MSG_OVERHEAD (sizeof(SRpcReqContext) + sizeof(SRpcHead) + sizeof(SRpcDigest)) +#define rpcHeadFromCont(cont) ((SRpcHead*)((char*)cont - sizeof(SRpcHead))) +#define rpcContFromHead(msg) (msg + sizeof(SRpcHead)) +#define rpcMsgLenFromCont(contLen) (contLen + sizeof(SRpcHead)) +#define rpcContLenFromMsg(msgLen) (msgLen - sizeof(SRpcHead)) +#define rpcIsReq(type) (type & 1U) + +int rpcAuthenticateMsg(void* pMsg, int msgLen, void* pAuth, void* pKey); +void rpcBuildAuthHead(void* pMsg, int msgLen, void* pAuth, void* pKey); +int32_t rpcCompressRpcMsg(char* pCont, int32_t contLen); +SRpcHead* rpcDecompressRpcMsg(SRpcHead* pHead); + +#endif diff --git a/source/libs/transport/inc/transportInt.h b/source/libs/transport/inc/transportInt.h index f93753cfe9..e39e0d9273 100644 --- a/source/libs/transport/inc/transportInt.h +++ b/source/libs/transport/inc/transportInt.h @@ -16,62 +16,61 @@ #ifndef _TD_TRANSPORT_INT_H_ #define _TD_TRANSPORT_INT_H_ +#ifdef USE_UV +#include +#endif +#include "lz4.h" +#include "os.h" +#include "rpcCache.h" #include "rpcHead.h" +#include "rpcLog.h" +#include "rpcTcp.h" +#include "rpcUdp.h" +#include "taoserror.h" +#include "tglobal.h" +#include "thash.h" +#include "tidpool.h" +#include "tmsg.h" +#include "tref.h" +#include "trpc.h" +#include "ttimer.h" +#include "tutil.h" + #ifdef __cplusplus extern "C" { #endif #ifdef USE_UV -#include -typedef void *queue[2]; +void* taosInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle); +void* taosInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle); -/* Private macros. */ -#define QUEUE_NEXT(q) (*(queue **)&((*(q))[0])) -#define QUEUE_PREV(q) (*(queue **)&((*(q))[1])) +typedef struct { + int sessions; // number of sessions allowed + int numOfThreads; // number of threads to process incoming messages + int idleTime; // milliseconds; + uint16_t localPort; + int8_t connType; + int64_t index; + char label[TSDB_LABEL_LEN]; -#define QUEUE_PREV_NEXT(q) (QUEUE_NEXT(QUEUE_PREV(q))) -#define QUEUE_NEXT_PREV(q) (QUEUE_PREV(QUEUE_NEXT(q))) + char user[TSDB_UNI_LEN]; // meter ID + char spi; // security parameter index + char encrypt; // encrypt algorithm + char secret[TSDB_PASSWORD_LEN]; // secret for the link + char ckey[TSDB_PASSWORD_LEN]; // ciphering key -/* Initialize an empty queue. */ -#define QUEUE_INIT(q) \ - { \ - QUEUE_NEXT(q) = (q); \ - QUEUE_PREV(q) = (q); \ - } + void (*cfp)(void* parent, SRpcMsg*, SEpSet*); + int (*afp)(void* parent, char* user, char* spi, char* encrypt, char* secret, char* ckey); -/* Return true if the queue has no element. */ -#define QUEUE_IS_EMPTY(q) ((const queue *)(q) == (const queue *)QUEUE_NEXT(q)) - -/* Insert an element at the back of a queue. */ -#define QUEUE_PUSH(q, e) \ - { \ - QUEUE_NEXT(e) = (q); \ - QUEUE_PREV(e) = QUEUE_PREV(q); \ - QUEUE_PREV_NEXT(e) = (e); \ - QUEUE_PREV(q) = (e); \ - } - -/* Remove the given element from the queue. Any element can be removed at any * - * time. */ -#define QUEUE_REMOVE(e) \ - { \ - QUEUE_PREV_NEXT(e) = QUEUE_NEXT(e); \ - QUEUE_NEXT_PREV(e) = QUEUE_PREV(e); \ - } - -/* Return the element at the front of the queue. */ -#define QUEUE_HEAD(q) (QUEUE_NEXT(q)) - -/* Return the element at the back of the queue. */ -#define QUEUE_TAIL(q) (QUEUE_PREV(q)) - -/* Iterate over the element of a queue. * Mutating the queue while iterating - * results in undefined behavior. */ -#define QUEUE_FOREACH(q, e) for ((q) = QUEUE_NEXT(e); (q) != (e); (q) = QUEUE_NEXT(q)) - -/* Return the structure holding the given element. */ -#define QUEUE_DATA(e, type, field) ((type *)((void *)((char *)(e)-offsetof(type, field)))) + int32_t refCount; + void* parent; + void* idPool; // handle to ID pool + void* tmrCtrl; // handle to timer + SHashObj* hash; // handle returned by hash utility + void* tcphandle; // returned handle from TCP initialization + pthread_mutex_t mutex; +} SRpcInfo; #endif // USE_LIBUV diff --git a/source/libs/transport/src/trans.c b/source/libs/transport/src/trans.c new file mode 100644 index 0000000000..89361b13ad --- /dev/null +++ b/source/libs/transport/src/trans.c @@ -0,0 +1,75 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifdef USE_UV + +#include "transComm.h" + +typedef struct SConnBuffer { + char* buf; + int len; + int cap; + int left; +} SConnBuffer; + +void* (*taosHandle[])(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) = { + taosInitServer, taosInitClient}; + +void* rpcOpen(const SRpcInit* pInit) { + SRpcInfo* pRpc = calloc(1, sizeof(SRpcInfo)); + if (pRpc == NULL) { + return NULL; + } + if (pInit->label) { + tstrncpy(pRpc->label, pInit->label, strlen(pInit->label)); + } + pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads; + pRpc->connType = pInit->connType; + pRpc->tcphandle = (*taosHandle[pRpc->connType])(0, pInit->localPort, pRpc->label, pRpc->numOfThreads, NULL, pRpc); + + return pRpc; +} +void rpcClose(void* arg) { return; } +void* rpcMallocCont(int contLen) { + int size = contLen + RPC_MSG_OVERHEAD; + + char* start = (char*)calloc(1, (size_t)size); + if (start == NULL) { + tError("failed to malloc msg, size:%d", size); + return NULL; + } else { + tTrace("malloc mem:%p size:%d", start, size); + } + return start + sizeof(SRpcReqContext) + sizeof(SRpcHead); +} +void rpcFreeCont(void* cont) { return; } +void* rpcReallocCont(void* ptr, int contLen) { return NULL; } + +void rpcSendRedirectRsp(void* pConn, const SEpSet* pEpSet) {} +int rpcGetConnInfo(void* thandle, SRpcConnInfo* pInfo) { return -1; } +void rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pReq, SRpcMsg* pRsp) { return; } +int rpcReportProgress(void* pConn, char* pCont, int contLen) { return -1; } +void rpcCancelRequest(int64_t rid) { return; } + +int32_t rpcInit(void) { + // impl later + return -1; +} + +void rpcCleanup(void) { + // impl later + return; +} +#endif diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c new file mode 100644 index 0000000000..29f3361b10 --- /dev/null +++ b/source/libs/transport/src/transCli.c @@ -0,0 +1,198 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifdef USE_UV + +#include "transComm.h" + +typedef struct SCliConn { + uv_connect_t connReq; + uv_stream_t* stream; + void* data; + queue conn; +} SCliConn; +typedef struct SCliMsg { + SRpcReqContext* context; + queue q; +} SCliMsg; + +typedef struct SCliThrdObj { + pthread_t thread; + uv_loop_t* loop; + uv_async_t* cliAsync; // + void* cache; // conn pool + queue msg; + pthread_mutex_t msgMtx; + void* shandle; +} SCliThrdObj; + +typedef struct SClientObj { + char label[TSDB_LABEL_LEN]; + int32_t index; + int numOfThreads; + SCliThrdObj** pThreadObj; +} SClientObj; + +static void clientWriteCb(uv_write_t* req, int status); +static void clientReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf); +static void clientConnCb(struct uv_connect_s* req, int status); +static void clientAsyncCb(uv_async_t* handle); + +static void* clientThread(void* arg); + +static void clientWriteCb(uv_write_t* req, int status) { + // impl later +} +static void clientFailedCb(uv_handle_t* handle) { + // impl later + tDebug("close handle"); +} +static void clientReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { + // impl later +} +static void clientConnCb(struct uv_connect_s* req, int status) { + SCliConn* pConn = req->data; + SCliMsg* pMsg = pConn->data; + SEpSet* pEpSet = &pMsg->context->epSet; + + char* fqdn = pEpSet->fqdn[pEpSet->inUse]; + uint32_t port = pEpSet->port[pEpSet->inUse]; + if (status != 0) { + // call user fp later + tError("failed to connect server(%s, %d), errmsg: %s", fqdn, port, uv_strerror(status)); + uv_close((uv_handle_t*)req->handle, clientFailedCb); + return; + } + assert(pConn->stream == req->handle); + + // impl later +} + +static SCliConn* getConnFromCache(void* cache, char* ip, uint32_t port) { + // impl later + return NULL; +} +static void clientAsyncCb(uv_async_t* handle) { + SCliThrdObj* pThrd = handle->data; + SCliMsg* pMsg = NULL; + pthread_mutex_lock(&pThrd->msgMtx); + if (!QUEUE_IS_EMPTY(&pThrd->msg)) { + queue* head = QUEUE_HEAD(&pThrd->msg); + pMsg = QUEUE_DATA(head, SCliMsg, q); + QUEUE_REMOVE(head); + } + pthread_mutex_unlock(&pThrd->msgMtx); + + SEpSet* pEpSet = &pMsg->context->epSet; + char* fqdn = pEpSet->fqdn[pEpSet->inUse]; + uint32_t port = pEpSet->port[pEpSet->inUse]; + + SCliConn* conn = getConnFromCache(pThrd->cache, fqdn, port); + if (conn != NULL) { + // impl later + } else { + SCliConn* conn = malloc(sizeof(SCliConn)); + + conn->stream = (uv_stream_t*)malloc(sizeof(uv_tcp_t)); + uv_tcp_init(pThrd->loop, (uv_tcp_t*)(conn->stream)); + + conn->connReq.data = conn; + conn->data = pMsg; + + struct sockaddr_in addr; + uv_ip4_addr(fqdn, port, &addr); + // handle error in callback if connect error + uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, clientConnCb); + } + + // SRpcReqContext* pCxt = pMsg->context; + + // SRpcHead* pHead = rpcHeadFromCont(pCtx->pCont); + // char* msg = (char*)pHead; + // int len = rpcMsgLenFromCont(pCtx->contLen); + // tmsg_t msgType = pCtx->msgType; + + // impl later +} + +static void* clientThread(void* arg) { + SCliThrdObj* pThrd = (SCliThrdObj*)arg; + uv_run(pThrd->loop, UV_RUN_DEFAULT); +} + +void* taosInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) { + SClientObj* cli = calloc(1, sizeof(SClientObj)); + memcpy(cli->label, label, strlen(label)); + cli->numOfThreads = numOfThreads; + cli->pThreadObj = (SCliThrdObj**)calloc(cli->numOfThreads, sizeof(SCliThrdObj*)); + + for (int i = 0; i < cli->numOfThreads; i++) { + SCliThrdObj* pThrd = (SCliThrdObj*)calloc(1, sizeof(SCliThrdObj)); + QUEUE_INIT(&pThrd->msg); + pthread_mutex_init(&pThrd->msgMtx, NULL); + + // QUEUE_INIT(&pThrd->clientCache); + + pThrd->loop = (uv_loop_t*)malloc(sizeof(uv_loop_t)); + uv_loop_init(pThrd->loop); + + pThrd->cliAsync = malloc(sizeof(uv_async_t)); + uv_async_init(pThrd->loop, pThrd->cliAsync, clientAsyncCb); + pThrd->cliAsync->data = pThrd; + + pThrd->shandle = shandle; + int err = pthread_create(&pThrd->thread, NULL, clientThread, (void*)(pThrd)); + if (err == 0) { + tDebug("sucess to create tranport-client thread %d", i); + } + cli->pThreadObj[i] = pThrd; + } + return cli; +} + +void rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* pRid) { + // impl later + SRpcInfo* pRpc = (SRpcInfo*)shandle; + + int len = rpcCompressRpcMsg(pMsg->pCont, pMsg->contLen); + + SRpcReqContext* pContext; + pContext = (SRpcReqContext*)((char*)pMsg->pCont - sizeof(SRpcHead) - sizeof(SRpcReqContext)); + pContext->ahandle = pMsg->ahandle; + pContext->pRpc = (SRpcInfo*)shandle; + pContext->epSet = *pEpSet; + pContext->contLen = len; + pContext->pCont = pMsg->pCont; + pContext->msgType = pMsg->msgType; + pContext->oldInUse = pEpSet->inUse; + + assert(pRpc->connType == TAOS_CONN_CLIENT); + // atomic or not + int64_t index = pRpc->index; + if (pRpc->index++ >= pRpc->numOfThreads) { + pRpc->index = 0; + } + SCliMsg* msg = malloc(sizeof(SCliMsg)); + msg->context = pContext; + + SCliThrdObj* thrd = ((SClientObj*)pRpc->tcphandle)->pThreadObj[index % pRpc->numOfThreads]; + + pthread_mutex_lock(&thrd->msgMtx); + QUEUE_PUSH(&thrd->msg, &msg->q); + pthread_mutex_unlock(&thrd->msgMtx); + + uv_async_send(thrd->cliAsync); +} +#endif diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c new file mode 100644 index 0000000000..f23cfb6e2d --- /dev/null +++ b/source/libs/transport/src/transComm.c @@ -0,0 +1,117 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +#ifdef USE_UV + +#include "transComm.h" + +int rpcAuthenticateMsg(void* pMsg, int msgLen, void* pAuth, void* pKey) { + T_MD5_CTX context; + int ret = -1; + + tMD5Init(&context); + tMD5Update(&context, (uint8_t*)pKey, TSDB_PASSWORD_LEN); + tMD5Update(&context, (uint8_t*)pMsg, msgLen); + tMD5Update(&context, (uint8_t*)pKey, TSDB_PASSWORD_LEN); + tMD5Final(&context); + + if (memcmp(context.digest, pAuth, sizeof(context.digest)) == 0) ret = 0; + + return ret; +} +void rpcBuildAuthHead(void* pMsg, int msgLen, void* pAuth, void* pKey) { + T_MD5_CTX context; + + tMD5Init(&context); + tMD5Update(&context, (uint8_t*)pKey, TSDB_PASSWORD_LEN); + tMD5Update(&context, (uint8_t*)pMsg, msgLen); + tMD5Update(&context, (uint8_t*)pKey, TSDB_PASSWORD_LEN); + tMD5Final(&context); + + memcpy(pAuth, context.digest, sizeof(context.digest)); +} + +int32_t rpcCompressRpcMsg(char* pCont, int32_t contLen) { + SRpcHead* pHead = rpcHeadFromCont(pCont); + int32_t finalLen = 0; + int overhead = sizeof(SRpcComp); + + if (!NEEDTO_COMPRESSS_MSG(contLen)) { + return contLen; + } + + char* buf = malloc(contLen + overhead + 8); // 8 extra bytes + if (buf == NULL) { + tError("failed to allocate memory for rpc msg compression, contLen:%d", contLen); + return contLen; + } + + int32_t compLen = LZ4_compress_default(pCont, buf, contLen, contLen + overhead); + tDebug("compress rpc msg, before:%d, after:%d, overhead:%d", contLen, compLen, overhead); + + /* + * only the compressed size is less than the value of contLen - overhead, the compression is applied + * The first four bytes is set to 0, the second four bytes are utilized to keep the original length of message + */ + if (compLen > 0 && compLen < contLen - overhead) { + SRpcComp* pComp = (SRpcComp*)pCont; + pComp->reserved = 0; + pComp->contLen = htonl(contLen); + memcpy(pCont + overhead, buf, compLen); + + pHead->comp = 1; + tDebug("compress rpc msg, before:%d, after:%d", contLen, compLen); + finalLen = compLen + overhead; + } else { + finalLen = contLen; + } + + free(buf); + return finalLen; +} + +SRpcHead* rpcDecompressRpcMsg(SRpcHead* pHead) { + int overhead = sizeof(SRpcComp); + SRpcHead* pNewHead = NULL; + uint8_t* pCont = pHead->content; + SRpcComp* pComp = (SRpcComp*)pHead->content; + + if (pHead->comp) { + // decompress the content + assert(pComp->reserved == 0); + int contLen = htonl(pComp->contLen); + + // prepare the temporary buffer to decompress message + char* temp = (char*)malloc(contLen + RPC_MSG_OVERHEAD); + pNewHead = (SRpcHead*)(temp + sizeof(SRpcReqContext)); // reserve SRpcReqContext + + if (pNewHead) { + int compLen = rpcContLenFromMsg(pHead->msgLen) - overhead; + int origLen = LZ4_decompress_safe((char*)(pCont + overhead), (char*)pNewHead->content, compLen, contLen); + assert(origLen == contLen); + + memcpy(pNewHead, pHead, sizeof(SRpcHead)); + pNewHead->msgLen = rpcMsgLenFromCont(origLen); + /// rpcFreeMsg(pHead); // free the compressed message buffer + pHead = pNewHead; + tTrace("decomp malloc mem:%p", temp); + } else { + tError("failed to allocate memory to decompress msg, contLen:%d", contLen); + } + } + + return pHead; +} + +#endif diff --git a/source/libs/transport/src/transport.c b/source/libs/transport/src/transSrv.c similarity index 55% rename from source/libs/transport/src/transport.c rename to source/libs/transport/src/transSrv.c index 6cc2ca8c49..0bf39b9985 100644 --- a/source/libs/transport/src/transport.c +++ b/source/libs/transport/src/transSrv.c @@ -14,118 +14,7 @@ */ #ifdef USE_UV - -#include -#include "lz4.h" -#include "os.h" -#include "rpcCache.h" -#include "rpcHead.h" -#include "rpcLog.h" -#include "rpcTcp.h" -#include "rpcUdp.h" -#include "taoserror.h" -#include "tglobal.h" -#include "thash.h" -#include "tidpool.h" -#include "tmd5.h" -#include "tmempool.h" -#include "tmsg.h" -#include "transportInt.h" -#include "tref.h" -#include "trpc.h" -#include "ttimer.h" -#include "tutil.h" - -#define container_of(ptr, type, member) ((type*)((char*)(ptr)-offsetof(type, member))) -#define RPC_RESERVE_SIZE (sizeof(SRpcReqContext)) -static const char* notify = "a"; - -typedef struct { - int sessions; // number of sessions allowed - int numOfThreads; // number of threads to process incoming messages - int idleTime; // milliseconds; - uint16_t localPort; - int8_t connType; - int index; // for UDP server only, round robin for multiple threads - char label[TSDB_LABEL_LEN]; - - char user[TSDB_UNI_LEN]; // meter ID - char spi; // security parameter index - char encrypt; // encrypt algorithm - char secret[TSDB_PASSWORD_LEN]; // secret for the link - char ckey[TSDB_PASSWORD_LEN]; // ciphering key - - void (*cfp)(void* parent, SRpcMsg*, SEpSet*); - int (*afp)(void* parent, char* user, char* spi, char* encrypt, char* secret, char* ckey); - - int32_t refCount; - void* parent; - void* idPool; // handle to ID pool - void* tmrCtrl; // handle to timer - SHashObj* hash; // handle returned by hash utility - void* tcphandle; // returned handle from TCP initialization - void* udphandle; // returned handle from UDP initialization - void* pCache; // connection cache - pthread_mutex_t mutex; - struct SRpcConn* connList; // connection list -} SRpcInfo; - -typedef struct { - SRpcInfo* pRpc; // associated SRpcInfo - SEpSet epSet; // ip list provided by app - void* ahandle; // handle provided by app - struct SRpcConn* pConn; // pConn allocated - tmsg_t msgType; // message type - uint8_t* pCont; // content provided by app - int32_t contLen; // content length - int32_t code; // error code - int16_t numOfTry; // number of try for different servers - int8_t oldInUse; // server EP inUse passed by app - int8_t redirect; // flag to indicate redirect - int8_t connType; // connection type - int64_t rid; // refId returned by taosAddRef - SRpcMsg* pRsp; // for synchronous API - tsem_t* pSem; // for synchronous API - SEpSet* pSet; // for synchronous API - char msg[0]; // RpcHead starts from here -} SRpcReqContext; - -typedef struct SThreadObj { - pthread_t thread; - uv_pipe_t* pipe; - int fd; - uv_loop_t* loop; - uv_async_t* workerAsync; // - queue conn; - pthread_mutex_t connMtx; - void* shandle; -} SThreadObj; - -typedef struct SClientObj { - char label[TSDB_LABEL_LEN]; - int32_t index; - int numOfThreads; - SThreadObj** pThreadObj; -} SClientObj; - -#define RPC_MSG_OVERHEAD (sizeof(SRpcReqContext) + sizeof(SRpcHead) + sizeof(SRpcDigest)) -#define rpcHeadFromCont(cont) ((SRpcHead*)((char*)cont - sizeof(SRpcHead))) -#define rpcContFromHead(msg) (msg + sizeof(SRpcHead)) -#define rpcMsgLenFromCont(contLen) (contLen + sizeof(SRpcHead)) -#define rpcContLenFromMsg(msgLen) (msgLen - sizeof(SRpcHead)) -#define rpcIsReq(type) (type & 1U) - -typedef struct SServerObj { - pthread_t thread; - uv_tcp_t server; - uv_loop_t* loop; - int workerIdx; - int numOfThreads; - SThreadObj** pThreadObj; - uv_pipe_t** pipe; - uint32_t ip; - uint32_t port; -} SServerObj; +#include "transComm.h" typedef struct SConnBuffer { char* buf; @@ -134,7 +23,7 @@ typedef struct SConnBuffer { int left; } SConnBuffer; -typedef struct SRpcConn { +typedef struct SConn { uv_tcp_t* pTcp; uv_write_t* pWriter; uv_timer_t* pTimer; @@ -148,7 +37,7 @@ typedef struct SRpcConn { int count; void* shandle; // rpc init void* ahandle; // - void* hostThread; + void* hostThrd; // del later char secured; int spi; @@ -156,16 +45,37 @@ typedef struct SRpcConn { char user[TSDB_UNI_LEN]; // user ID for the link char secret[TSDB_PASSWORD_LEN]; char ckey[TSDB_PASSWORD_LEN]; // ciphering key -} SRpcConn; +} SConn; -// auth function -static int uvAuthMsg(SRpcConn* pConn, char* msg, int msgLen); -static int rpcAuthenticateMsg(void* pMsg, int msgLen, void* pAuth, void* pKey); -static void rpcBuildAuthHead(void* pMsg, int msgLen, void* pAuth, void* pKey); -static int rpcAddAuthPart(SRpcConn* pConn, char* msg, int msgLen); -// compress data -static int32_t rpcCompressRpcMsg(char* pCont, int32_t contLen); -static SRpcHead* rpcDecompressRpcMsg(SRpcHead* pHead); +typedef struct SWorkThrdObj { + pthread_t thread; + uv_pipe_t* pipe; + int fd; + uv_loop_t* loop; + uv_async_t* workerAsync; // + queue conn; + pthread_mutex_t connMtx; + void* shandle; +} SWorkThrdObj; + +typedef struct SServerObj { + pthread_t thread; + uv_tcp_t server; + uv_loop_t* loop; + int workerIdx; + int numOfThreads; + SWorkThrdObj** pThreadObj; + uv_pipe_t** pipe; + uint32_t ip; + uint32_t port; +} SServerObj; + +static const char* notify = "a"; + +// refactor later +static int rpcAddAuthPart(SConn* pConn, char* msg, int msgLen); + +static int uvAuthMsg(SConn* pConn, char* msg, int msgLen); static void uvAllocConnBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf); static void uvAllocReadBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf); @@ -176,79 +86,17 @@ static void uvOnAcceptCb(uv_stream_t* stream, int status); static void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf); static void uvWorkerAsyncCb(uv_async_t* handle); -static SRpcConn* connCreate(); -static void connDestroy(SRpcConn* conn); -static void uvConnDestroy(uv_handle_t* handle); +// already read complete packet +static bool readComplete(SConnBuffer* buf); +static SConn* connCreate(); +static void connDestroy(SConn* conn); +static void uvConnDestroy(uv_handle_t* handle); + +// server worke thread static void* workerThread(void* arg); static void* acceptThread(void* arg); -void* taosInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle); -void* taosInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle); - -void* (*taosHandle[])(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) = {taosInitServer, taosInitClient}; - -void* taosInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) { - SClientObj* cli = calloc(1, sizeof(SClientObj)); - memcpy(cli->label, label, strlen(label)); - cli->numOfThreads = numOfThreads; - cli->pThreadObj = (SThreadObj**)calloc(cli->numOfThreads, sizeof(SThreadObj*)); - - for (int i = 0; i < cli->numOfThreads; i++) { - SThreadObj* thrd = (SThreadObj*)calloc(1, sizeof(SThreadObj)); - - int err = pthread_create(&thrd->thread, NULL, workerThread, (void*)(thrd)); - if (err == 0) { - tDebug("sucess to create tranport-client thread %d", i); - } - } - return cli; -} - -void* taosInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) { - SServerObj* srv = calloc(1, sizeof(SServerObj)); - srv->loop = (uv_loop_t*)malloc(sizeof(uv_loop_t)); - srv->numOfThreads = numOfThreads; - srv->workerIdx = 0; - srv->pThreadObj = (SThreadObj**)calloc(srv->numOfThreads, sizeof(SThreadObj*)); - srv->pipe = (uv_pipe_t**)calloc(srv->numOfThreads, sizeof(uv_pipe_t*)); - srv->ip = ip; - srv->port = port; - uv_loop_init(srv->loop); - - for (int i = 0; i < srv->numOfThreads; i++) { - SThreadObj* thrd = (SThreadObj*)calloc(1, sizeof(SThreadObj)); - srv->pipe[i] = (uv_pipe_t*)calloc(2, sizeof(uv_pipe_t)); - int fds[2]; - if (uv_socketpair(AF_UNIX, SOCK_STREAM, fds, UV_NONBLOCK_PIPE, UV_NONBLOCK_PIPE) != 0) { - return NULL; - } - uv_pipe_init(srv->loop, &(srv->pipe[i][0]), 1); - uv_pipe_open(&(srv->pipe[i][0]), fds[1]); // init write - - thrd->shandle = shandle; - thrd->fd = fds[0]; - thrd->pipe = &(srv->pipe[i][1]); // init read - int err = pthread_create(&(thrd->thread), NULL, workerThread, (void*)(thrd)); - if (err == 0) { - tDebug("sucess to create worker-thread %d", i); - // printf("thread %d create\n", i); - } else { - // TODO: clear all other resource later - tError("failed to create worker-thread %d", i); - } - srv->pThreadObj[i] = thrd; - } - - int err = pthread_create(&srv->thread, NULL, acceptThread, (void*)srv); - if (err == 0) { - tDebug("success to create accept-thread"); - } else { - // clear all resource later - } - - return srv; -} void uvAllocReadBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { /* * formate of data buffer: @@ -256,8 +104,8 @@ void uvAllocReadBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* b */ static const int CAPACITY = 1024; - SRpcConn* ctx = handle->data; - SConnBuffer* pBuf = &ctx->connBuf; + SConn* conn = handle->data; + SConnBuffer* pBuf = &conn->connBuf; if (pBuf->cap == 0) { pBuf->buf = (char*)calloc(CAPACITY + RPC_RESERVE_SIZE, sizeof(char)); pBuf->len = 0; @@ -280,9 +128,10 @@ void uvAllocReadBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* b buf->len = pBuf->cap - pBuf->len; } } + // check data read from socket completely or not // -static bool isReadAll(SConnBuffer* data) { +static bool readComplete(SConnBuffer* data) { // TODO(yihao): handle pipeline later SRpcHead rpcHead; int32_t headLen = sizeof(rpcHead); @@ -299,10 +148,11 @@ static bool isReadAll(SConnBuffer* data) { return false; } } + static void uvDoProcess(SRecvInfo* pRecv) { SRpcHead* pHead = (SRpcHead*)pRecv->msg; SRpcInfo* pRpc = (SRpcInfo*)pRecv->shandle; - SRpcConn* pConn = pRecv->thandle; + SConn* pConn = pRecv->thandle; tDump(pRecv->msg, pRecv->msgLen); @@ -311,7 +161,8 @@ static void uvDoProcess(SRecvInfo* pRecv) { // do auth and check } -static int uvAuthMsg(SRpcConn* pConn, char* msg, int len) { + +static int uvAuthMsg(SConn* pConn, char* msg, int len) { SRpcHead* pHead = (SRpcHead*)msg; int code = 0; @@ -325,7 +176,8 @@ static int uvAuthMsg(SRpcConn* pConn, char* msg, int len) { if (!rpcIsReq(pHead->msgType)) { // for response, if code is auth failure, it shall bypass the auth process code = htonl(pHead->code); - if (code == TSDB_CODE_RPC_INVALID_TIME_STAMP || code == TSDB_CODE_RPC_AUTH_FAILURE || code == TSDB_CODE_RPC_INVALID_VERSION || code == TSDB_CODE_RPC_AUTH_REQUIRED || + if (code == TSDB_CODE_RPC_INVALID_TIME_STAMP || code == TSDB_CODE_RPC_AUTH_FAILURE || + code == TSDB_CODE_RPC_INVALID_VERSION || code == TSDB_CODE_RPC_AUTH_REQUIRED || code == TSDB_CODE_MND_USER_NOT_EXIST || code == TSDB_CODE_RPC_NOT_READY) { pHead->msgLen = (int32_t)htonl((uint32_t)pHead->msgLen); // tTrace("%s, dont check authentication since code is:0x%x", pConn->info, code); @@ -361,12 +213,14 @@ static int uvAuthMsg(SRpcConn* pConn, char* msg, int len) { return code; } + // refers specifically to query or insert timeout static void uvHandleActivityTimeout(uv_timer_t* handle) { // impl later - SRpcConn* conn = handle->data; + SConn* conn = handle->data; } -static void uvProcessData(SRpcConn* pConn) { + +static void uvProcessData(SConn* pConn) { SRecvInfo info; SRecvInfo* p = &info; SConnBuffer* pBuf = &pConn->connBuf; @@ -408,13 +262,14 @@ static void uvProcessData(SRpcConn* pConn) { // auth // validate msg type } + void uvOnReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { // opt - SRpcConn* ctx = cli->data; + SConn* ctx = cli->data; SConnBuffer* pBuf = &ctx->connBuf; if (nread > 0) { pBuf->len += nread; - if (isReadAll(pBuf)) { + if (readComplete(pBuf)) { tDebug("alread read complete packet"); uvProcessData(ctx); } else { @@ -442,7 +297,7 @@ void uvOnTimeoutCb(uv_timer_t* handle) { } void uvOnWriteCb(uv_write_t* req, int status) { - SRpcConn* conn = req->data; + SConn* conn = req->data; if (status == 0) { tDebug("data already was written on stream"); } else { @@ -452,15 +307,15 @@ void uvOnWriteCb(uv_write_t* req, int status) { } void uvWorkerAsyncCb(uv_async_t* handle) { - SThreadObj* pThrd = container_of(handle, SThreadObj, workerAsync); - SRpcConn* conn = NULL; + SWorkThrdObj* pThrd = container_of(handle, SWorkThrdObj, workerAsync); + SConn* conn = NULL; // opt later pthread_mutex_lock(&pThrd->connMtx); if (!QUEUE_IS_EMPTY(&pThrd->conn)) { queue* head = QUEUE_HEAD(&pThrd->conn); - conn = QUEUE_DATA(head, SRpcConn, queue); - QUEUE_REMOVE(&conn->queue); + conn = QUEUE_DATA(head, SConn, queue); + QUEUE_REMOVE(head); } pthread_mutex_unlock(&pThrd->connMtx); if (conn == NULL) { @@ -507,7 +362,7 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) { assert(buf->base[0] == notify[0]); free(buf->base); - SThreadObj* pThrd = q->data; + SWorkThrdObj* pThrd = q->data; uv_pipe_t* pipe = (uv_pipe_t*)q; if (!uv_pipe_pending_count(pipe)) { @@ -518,14 +373,14 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) { uv_handle_type pending = uv_pipe_pending_type(pipe); assert(pending == UV_TCP); - SRpcConn* pConn = connCreate(); + SConn* pConn = connCreate(); pConn->shandle = pThrd->shandle; /* init conn timer*/ pConn->pTimer = malloc(sizeof(uv_timer_t)); uv_timer_init(pThrd->loop, pConn->pTimer); pConn->pTimer->data = pConn; - pConn->hostThread = pThrd; + pConn->hostThrd = pThrd; pConn->pWorkerAsync = pThrd->workerAsync; // thread safty // init client handle @@ -564,17 +419,19 @@ void* acceptThread(void* arg) { uv_run(srv->loop, UV_RUN_DEFAULT); } void* workerThread(void* arg) { - SThreadObj* pThrd = (SThreadObj*)arg; + SWorkThrdObj* pThrd = (SWorkThrdObj*)arg; pThrd->loop = (uv_loop_t*)malloc(sizeof(uv_loop_t)); uv_loop_init(pThrd->loop); + // SRpcInfo* pRpc = pThrd->shandle; uv_pipe_init(pThrd->loop, pThrd->pipe, 1); uv_pipe_open(pThrd->pipe, pThrd->fd); pThrd->pipe->data = pThrd; QUEUE_INIT(&pThrd->conn); + pthread_mutex_init(&pThrd->connMtx, NULL); pThrd->workerAsync = malloc(sizeof(uv_async_t)); uv_async_init(pThrd->loop, pThrd->workerAsync, uvWorkerAsyncCb); @@ -582,11 +439,12 @@ void* workerThread(void* arg) { uv_read_start((uv_stream_t*)pThrd->pipe, uvAllocConnBufferCb, uvOnConnectionCb); uv_run(pThrd->loop, UV_RUN_DEFAULT); } -static SRpcConn* connCreate() { - SRpcConn* pConn = (SRpcConn*)calloc(1, sizeof(SRpcConn)); + +static SConn* connCreate() { + SConn* pConn = (SConn*)calloc(1, sizeof(SConn)); return pConn; } -static void connDestroy(SRpcConn* conn) { +static void connDestroy(SConn* conn) { if (conn == NULL) { return; } @@ -600,78 +458,10 @@ static void connDestroy(SRpcConn* conn) { // handle } static void uvConnDestroy(uv_handle_t* handle) { - SRpcConn* conn = handle->data; + SConn* conn = handle->data; connDestroy(conn); } -void* rpcOpen(const SRpcInit* pInit) { - SRpcInfo* pRpc = calloc(1, sizeof(SRpcInfo)); - if (pRpc == NULL) { - return NULL; - } - if (pInit->label) { - tstrncpy(pRpc->label, pInit->label, strlen(pInit->label)); - } - pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads; - pRpc->connType = pInit->connType; - pRpc->tcphandle = (*taosHandle[pRpc->connType])(0, pInit->localPort, pRpc->label, pRpc->numOfThreads, NULL, pRpc); - // pRpc->taosInitServer(0, pInit->localPort, pRpc->label, pRpc->numOfThreads, NULL, pRpc); - return pRpc; -} -void rpcClose(void* arg) { return; } -void* rpcMallocCont(int contLen) { return NULL; } -void rpcFreeCont(void* cont) { return; } -void* rpcReallocCont(void* ptr, int contLen) { return NULL; } - -void rpcSendRequest(void* thandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* rid) { - // impl later - return; -} - -void rpcSendResponse(const SRpcMsg* pMsg) { - SRpcConn* pConn = pMsg->handle; - SThreadObj* pThrd = pConn->hostThread; - - // opt later - pthread_mutex_lock(&pThrd->connMtx); - QUEUE_PUSH(&pThrd->conn, &pConn->queue); - pthread_mutex_unlock(&pThrd->connMtx); - - uv_async_send(pConn->pWorkerAsync); -} - -void rpcSendRedirectRsp(void* pConn, const SEpSet* pEpSet) {} -int rpcGetConnInfo(void* thandle, SRpcConnInfo* pInfo) { return -1; } -void rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pReq, SRpcMsg* pRsp) { return; } -int rpcReportProgress(void* pConn, char* pCont, int contLen) { return -1; } -void rpcCancelRequest(int64_t rid) { return; } - -static int rpcAuthenticateMsg(void* pMsg, int msgLen, void* pAuth, void* pKey) { - T_MD5_CTX context; - int ret = -1; - - tMD5Init(&context); - tMD5Update(&context, (uint8_t*)pKey, TSDB_PASSWORD_LEN); - tMD5Update(&context, (uint8_t*)pMsg, msgLen); - tMD5Update(&context, (uint8_t*)pKey, TSDB_PASSWORD_LEN); - tMD5Final(&context); - - if (memcmp(context.digest, pAuth, sizeof(context.digest)) == 0) ret = 0; - - return ret; -} -static void rpcBuildAuthHead(void* pMsg, int msgLen, void* pAuth, void* pKey) { - T_MD5_CTX context; - - tMD5Init(&context); - tMD5Update(&context, (uint8_t*)pKey, TSDB_PASSWORD_LEN); - tMD5Update(&context, (uint8_t*)pMsg, msgLen); - tMD5Update(&context, (uint8_t*)pKey, TSDB_PASSWORD_LEN); - tMD5Final(&context); - - memcpy(pAuth, context.digest, sizeof(context.digest)); -} - -static int rpcAddAuthPart(SRpcConn* pConn, char* msg, int msgLen) { +static int rpcAddAuthPart(SConn* pConn, char* msg, int msgLen) { SRpcHead* pHead = (SRpcHead*)msg; if (pConn->spi && pConn->secured == 0) { @@ -690,84 +480,61 @@ static int rpcAddAuthPart(SRpcConn* pConn, char* msg, int msgLen) { return msgLen; } -static int32_t rpcCompressRpcMsg(char* pCont, int32_t contLen) { - SRpcHead* pHead = rpcHeadFromCont(pCont); - int32_t finalLen = 0; - int overhead = sizeof(SRpcComp); +void* taosInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) { + SServerObj* srv = calloc(1, sizeof(SServerObj)); + srv->loop = (uv_loop_t*)malloc(sizeof(uv_loop_t)); + srv->numOfThreads = numOfThreads; + srv->workerIdx = 0; + srv->pThreadObj = (SWorkThrdObj**)calloc(srv->numOfThreads, sizeof(SWorkThrdObj*)); + srv->pipe = (uv_pipe_t**)calloc(srv->numOfThreads, sizeof(uv_pipe_t*)); + srv->ip = ip; + srv->port = port; + uv_loop_init(srv->loop); - if (!NEEDTO_COMPRESSS_MSG(contLen)) { - return contLen; - } - - char* buf = malloc(contLen + overhead + 8); // 8 extra bytes - if (buf == NULL) { - tError("failed to allocate memory for rpc msg compression, contLen:%d", contLen); - return contLen; - } - - int32_t compLen = LZ4_compress_default(pCont, buf, contLen, contLen + overhead); - tDebug("compress rpc msg, before:%d, after:%d, overhead:%d", contLen, compLen, overhead); - - /* - * only the compressed size is less than the value of contLen - overhead, the compression is applied - * The first four bytes is set to 0, the second four bytes are utilized to keep the original length of message - */ - if (compLen > 0 && compLen < contLen - overhead) { - SRpcComp* pComp = (SRpcComp*)pCont; - pComp->reserved = 0; - pComp->contLen = htonl(contLen); - memcpy(pCont + overhead, buf, compLen); - - pHead->comp = 1; - tDebug("compress rpc msg, before:%d, after:%d", contLen, compLen); - finalLen = compLen + overhead; - } else { - finalLen = contLen; - } - - free(buf); - return finalLen; -} - -static SRpcHead* rpcDecompressRpcMsg(SRpcHead* pHead) { - int overhead = sizeof(SRpcComp); - SRpcHead* pNewHead = NULL; - uint8_t* pCont = pHead->content; - SRpcComp* pComp = (SRpcComp*)pHead->content; - - if (pHead->comp) { - // decompress the content - assert(pComp->reserved == 0); - int contLen = htonl(pComp->contLen); - - // prepare the temporary buffer to decompress message - char* temp = (char*)malloc(contLen + RPC_MSG_OVERHEAD); - pNewHead = (SRpcHead*)(temp + sizeof(SRpcReqContext)); // reserve SRpcReqContext - - if (pNewHead) { - int compLen = rpcContLenFromMsg(pHead->msgLen) - overhead; - int origLen = LZ4_decompress_safe((char*)(pCont + overhead), (char*)pNewHead->content, compLen, contLen); - assert(origLen == contLen); - - memcpy(pNewHead, pHead, sizeof(SRpcHead)); - pNewHead->msgLen = rpcMsgLenFromCont(origLen); - /// rpcFreeMsg(pHead); // free the compressed message buffer - pHead = pNewHead; - tTrace("decomp malloc mem:%p", temp); - } else { - tError("failed to allocate memory to decompress msg, contLen:%d", contLen); + for (int i = 0; i < srv->numOfThreads; i++) { + SWorkThrdObj* thrd = (SWorkThrdObj*)calloc(1, sizeof(SWorkThrdObj)); + srv->pipe[i] = (uv_pipe_t*)calloc(2, sizeof(uv_pipe_t)); + int fds[2]; + if (uv_socketpair(AF_UNIX, SOCK_STREAM, fds, UV_NONBLOCK_PIPE, UV_NONBLOCK_PIPE) != 0) { + return NULL; } + uv_pipe_init(srv->loop, &(srv->pipe[i][0]), 1); + uv_pipe_open(&(srv->pipe[i][0]), fds[1]); // init write + + thrd->shandle = shandle; + thrd->fd = fds[0]; + thrd->pipe = &(srv->pipe[i][1]); // init read + int err = pthread_create(&(thrd->thread), NULL, workerThread, (void*)(thrd)); + if (err == 0) { + tDebug("sucess to create worker-thread %d", i); + // printf("thread %d create\n", i); + } else { + // TODO: clear all other resource later + tError("failed to create worker-thread %d", i); + } + srv->pThreadObj[i] = thrd; } - return pHead; -} -int32_t rpcInit(void) { - // impl later - return -1; + int err = pthread_create(&srv->thread, NULL, acceptThread, (void*)srv); + if (err == 0) { + tDebug("success to create accept-thread"); + } else { + // clear all resource later + } + + return srv; } -void rpcCleanup(void) { - // impl later - return; +void rpcSendResponse(const SRpcMsg* pMsg) { + SConn* pConn = pMsg->handle; + SWorkThrdObj* pThrd = pConn->hostThrd; + + // opt later + pthread_mutex_lock(&pThrd->connMtx); + QUEUE_PUSH(&pThrd->conn, &pConn->queue); + pthread_mutex_unlock(&pThrd->connMtx); + + uv_async_send(pConn->pWorkerAsync); } + #endif diff --git a/source/libs/transport/test/rclient.c b/source/libs/transport/test/rclient.c index 045fb8520e..58fbf6ae85 100644 --- a/source/libs/transport/test/rclient.c +++ b/source/libs/transport/test/rclient.c @@ -34,7 +34,8 @@ typedef struct { static void processResponse(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) { SInfo *pInfo = (SInfo *)pMsg->ahandle; - tDebug("thread:%d, response is received, type:%d contLen:%d code:0x%x", pInfo->index, pMsg->msgType, pMsg->contLen, pMsg->code); + tDebug("thread:%d, response is received, type:%d contLen:%d code:0x%x", pInfo->index, pMsg->msgType, pMsg->contLen, + pMsg->code); if (pEpSet) pInfo->epSet = *pEpSet; @@ -185,7 +186,8 @@ int main(int argc, char *argv[]) { // float usedTime = (endTime - startTime) / 1000.0f; // mseconds // tInfo("it takes %.3f mseconds to send %d requests to server", usedTime, numOfReqs * appThreads); - // tInfo("Performance: %.3f requests per second, msgSize:%d bytes", 1000.0 * numOfReqs * appThreads / usedTime, msgSize); + // tInfo("Performance: %.3f requests per second, msgSize:%d bytes", 1000.0 * numOfReqs * appThreads / usedTime, + // msgSize); int ch = getchar(); UNUSED(ch); diff --git a/source/libs/transport/test/transportTests.cc b/source/libs/transport/test/transportTests.cc index 151deaf29b..53910aa30c 100644 --- a/source/libs/transport/test/transportTests.cc +++ b/source/libs/transport/test/transportTests.cc @@ -22,6 +22,7 @@ #include #include +#include "transComm.h" #include "transportInt.h" #include "trpc.h" @@ -46,7 +47,7 @@ class QueueObj { if (!IsEmpty()) { queue *h = QUEUE_HEAD(&head); el = QUEUE_DATA(h, QueueElem, q); - QUEUE_REMOVE(&el->q); + QUEUE_REMOVE(h); } return el; } diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 98a6cde37a..a5dd1483ec 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -360,6 +360,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_QRY_TASK_CANCELLING, "Task cancelling") TAOS_DEFINE_ERROR(TSDB_CODE_QRY_TASK_DROPPING, "Task dropping") TAOS_DEFINE_ERROR(TSDB_CODE_QRY_DUPLICATTED_OPERATION, "Duplicatted operation") TAOS_DEFINE_ERROR(TSDB_CODE_QRY_TASK_MSG_ERROR, "Task message error") +TAOS_DEFINE_ERROR(TSDB_CODE_QRY_JOB_FREED, "Job already freed") diff --git a/tests/script/sh/massiveTable/deployCluster.sh b/tests/script/sh/massiveTable/deployCluster.sh index 47802ea6a7..a86e9220cd 100755 --- a/tests/script/sh/massiveTable/deployCluster.sh +++ b/tests/script/sh/massiveTable/deployCluster.sh @@ -6,19 +6,19 @@ set -e #set -x # deployCluster.sh +curr_dir=$(readlink -f "$(dirname "$0")") +echo $curr_dir -curr_dir=$(pwd) +${curr_dir}/cleanCluster.sh -r "/data" +${curr_dir}/cleanCluster.sh -r "/data2" -source ./cleanCluster.sh -r /data -source ./cleanCluster.sh -r /data2 +${curr_dir}/compileVersion.sh -r ${curr_dir}/../../../../ -v "3.0" -source ./compileVersion.sh -r ${curr_dir}/../../../../ -v "3.0" +${curr_dir}/setupDnodes.sh -r "/data" -n 1 -f "trd02:7000" -p 7000 +${curr_dir}/setupDnodes.sh -r "/data2" -n 1 -f "trd02:7000" -p 8000 -source ./setupDnodes.sh -r /data -n 1 -f trd02:7000 -p 7000 -source ./setupDnodes.sh -r /data2 -n 1 -f trd02:7000 -p 8000 - -#source ./setupDnodes.sh -r /data -n 2 -f trd02:7000 -p 7000 -#source ./setupDnodes.sh -r /data2 -n 2 -f trd02:7000 -p 8000 +#./setupDnodes.sh -r "/data" -n 2 -f trd02:7000 -p 7000 +#./setupDnodes.sh -r "/data2" -n 2 -f trd02:7000 -p 8000 diff --git a/tests/script/sh/massiveTable/setupDnodes.sh b/tests/script/sh/massiveTable/setupDnodes.sh index 034c15c4eb..e45c7724ba 100755 --- a/tests/script/sh/massiveTable/setupDnodes.sh +++ b/tests/script/sh/massiveTable/setupDnodes.sh @@ -94,7 +94,7 @@ createNewDnodesDataDir() { mkdir -p ${dataRootDir}/dnode_${i}/data createNewCfgFile ${dataRootDir}/dnode_${i}/cfg ${dataRootDir}/dnode_${i}/data ${dataRootDir}/dnode_${i}/log ${firstEp} ${serverPort} - echo "create dnode: ${serverPort}, ${dataRootDir}/dnode_${i}" + #echo "create dnode: ${serverPort}, ${dataRootDir}/dnode_${i}" serverPort=$((10#${serverPort}+100)) done } @@ -121,7 +121,7 @@ startDnodes() { ############################### main process ########################################## ## kill all taosd process -kill_process taosd +#kill_process taosd ## create director for all dnode if [[ "$enviMode" == "new" ]]; then @@ -131,6 +131,7 @@ fi ## start all dnode by nohup startDnodes ${dnodeNumber} -echo " run setupDnodes.sh end !!!" +echo "====run setupDnodes.sh end====" +echo " "