diff --git a/src/client/src/tscParseLineProtocol.c b/src/client/src/tscParseLineProtocol.c index c6cdee6f1f..7d2823a42e 100644 --- a/src/client/src/tscParseLineProtocol.c +++ b/src/client/src/tscParseLineProtocol.c @@ -484,7 +484,7 @@ int32_t loadTableMeta(TAOS* taos, char* tableName, SSmlSTableSchema* schema, SSm size_t tagIndex = taosArrayGetSize(schema->tags) - 1; taosHashPut(schema->tagHash, field.name, strlen(field.name), &tagIndex, sizeof(tagIndex)); } - tscDebug("SML:0x%"PRIx64 "load table meta succeed. %s, columns number: %d, tag number: %d, precision: %d", + tscDebug("SML:0x%"PRIx64 " load table meta succeed. table name: %s, columns number: %d, tag number: %d, precision: %d", info->id, tableName, tableMeta->tableInfo.numOfColumns, tableMeta->tableInfo.numOfTags, schema->precision); free(tableMeta); tableMeta = NULL; return code; diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index cebabfc024..963b831c1d 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -337,11 +337,16 @@ int tscSendMsgToServer(SSqlObj *pSql) { return TSDB_CODE_SUCCESS; } -void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { +static void doProcessMsgFromServer(SSchedMsg* pSchedMsg) { + SRpcMsg* rpcMsg = pSchedMsg->ahandle; + SRpcEpSet* pEpSet = pSchedMsg->thandle; + TSDB_CACHE_PTR_TYPE handle = (TSDB_CACHE_PTR_TYPE) rpcMsg->ahandle; SSqlObj* pSql = (SSqlObj*)taosAcquireRef(tscObjRef, handle); if (pSql == NULL) { rpcFreeCont(rpcMsg->pCont); + free(rpcMsg); + free(pEpSet); return; } @@ -359,17 +364,21 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { taosRemoveRef(tscObjRef, handle); taosReleaseRef(tscObjRef, handle); rpcFreeCont(rpcMsg->pCont); + free(rpcMsg); + free(pEpSet); return; } SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd); if (pQueryInfo != NULL && pQueryInfo->type == TSDB_QUERY_TYPE_FREE_RESOURCE) { tscDebug("0x%"PRIx64" sqlObj needs to be released or DB connection is closed, cmd:%d type:%d, pObj:%p signature:%p", - pSql->self, pCmd->command, pQueryInfo->type, pObj, pObj->signature); + pSql->self, pCmd->command, pQueryInfo->type, pObj, pObj->signature); taosRemoveRef(tscObjRef, handle); taosReleaseRef(tscObjRef, handle); rpcFreeCont(rpcMsg->pCont); + free(rpcMsg); + free(pEpSet); return; } @@ -393,13 +402,13 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { // single table query error need to be handled here. if ((cmd == TSDB_SQL_SELECT || cmd == TSDB_SQL_UPDATE_TAGS_VAL) && (((rpcMsg->code == TSDB_CODE_TDB_INVALID_TABLE_ID || // change the retry procedure - rpcMsg->code == TSDB_CODE_VND_INVALID_VGROUP_ID)) || + rpcMsg->code == TSDB_CODE_VND_INVALID_VGROUP_ID)) || rpcMsg->code == TSDB_CODE_RPC_NETWORK_UNAVAIL || // change the retry procedure rpcMsg->code == TSDB_CODE_APP_NOT_READY)) { if (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, (TSDB_QUERY_TYPE_STABLE_SUBQUERY | TSDB_QUERY_TYPE_SUBQUERY | TSDB_QUERY_TYPE_TAG_FILTER_QUERY)) && - !TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_PROJECTION_QUERY)) { + !TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_PROJECTION_QUERY)) { // do nothing in case of super table subquery } else { pSql->retry += 1; @@ -422,6 +431,8 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { if (rpcMsg->code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) { taosReleaseRef(tscObjRef, handle); rpcFreeCont(rpcMsg->pCont); + free(rpcMsg); + free(pEpSet); return; } } @@ -429,7 +440,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { } pRes->rspLen = 0; - + if (pRes->code == TSDB_CODE_TSC_QUERY_CANCELLED) { tscDebug("0x%"PRIx64" query is cancelled, code:%s", pSql->self, tstrerror(pRes->code)); } else { @@ -473,12 +484,12 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { pRes->numOfRows += pMsg->affectedRows; tscDebug("0x%"PRIx64" SQL cmd:%s, code:%s inserted rows:%d rspLen:%d", pSql->self, sqlCmd[pCmd->command], - tstrerror(pRes->code), pMsg->affectedRows, pRes->rspLen); + tstrerror(pRes->code), pMsg->affectedRows, pRes->rspLen); } else { tscDebug("0x%"PRIx64" SQL cmd:%s, code:%s rspLen:%d", pSql->self, sqlCmd[pCmd->command], tstrerror(pRes->code), pRes->rspLen); } } - + if (pRes->code == TSDB_CODE_SUCCESS && tscProcessMsgRsp[pCmd->command]) { rpcMsg->code = (*tscProcessMsgRsp[pCmd->command])(pSql); } @@ -499,6 +510,31 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { taosReleaseRef(tscObjRef, handle); rpcFreeCont(rpcMsg->pCont); + free(rpcMsg); + free(pEpSet); +} + +void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { + SSchedMsg schedMsg = {0}; + + schedMsg.fp = doProcessMsgFromServer; + + SRpcMsg* rpcMsgCopy = calloc(1, sizeof(SRpcMsg)); + memcpy(rpcMsgCopy, rpcMsg, sizeof(struct SRpcMsg)); + rpcMsgCopy->pCont = rpcMallocCont(rpcMsg->contLen); + memcpy(rpcMsgCopy->pCont, rpcMsg->pCont, rpcMsg->contLen); + schedMsg.ahandle = (void*)rpcMsgCopy; + + SRpcEpSet* pEpSetCopy = NULL; + if (pEpSet != NULL) { + pEpSetCopy = calloc(1, sizeof(SRpcEpSet)); + memcpy(pEpSetCopy, pEpSet, sizeof(SRpcEpSet)); + } + + schedMsg.thandle = (void*)pEpSetCopy; + schedMsg.msg = NULL; + + taosScheduleTask(tscQhandle, &schedMsg); } int doBuildAndSendMsg(SSqlObj *pSql) {