Merge branch '3.0' into feature/TD-14481-3.0

This commit is contained in:
Cary Xu 2022-05-20 18:44:48 +08:00
commit 6a1649e6d9
12 changed files with 177 additions and 150 deletions

View File

@ -183,7 +183,9 @@ extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t
#define SET_META_TYPE_BOTH_TABLE(t) (t) = META_TYPE_BOTH_TABLE
#define NEED_CLIENT_RM_TBLMETA_ERROR(_code) \
((_code) == TSDB_CODE_PAR_TABLE_NOT_EXIST || (_code) == TSDB_CODE_VND_TB_NOT_EXIST)
((_code) == TSDB_CODE_PAR_TABLE_NOT_EXIST || (_code) == TSDB_CODE_VND_TB_NOT_EXIST || \
(_code) == TSDB_CODE_PAR_INVALID_COLUMNS_NUM || (_code) == TSDB_CODE_PAR_INVALID_COLUMN || \
(_code) == TSDB_CODE_PAR_TAGS_NOT_MATCHED)
#define NEED_CLIENT_REFRESH_VG_ERROR(_code) \
((_code) == TSDB_CODE_VND_HASH_MISMATCH || (_code) == TSDB_CODE_VND_INVALID_VGROUP_ID)
#define NEED_CLIENT_REFRESH_TBLMETA_ERROR(_code) ((_code) == TSDB_CODE_TDB_TABLE_RECREATED)
@ -194,7 +196,7 @@ extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t
#define NEED_SCHEDULER_RETRY_ERROR(_code) \
((_code) == TSDB_CODE_RPC_REDIRECT || (_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL)
#define REQUEST_MAX_TRY_TIMES 5
#define REQUEST_MAX_TRY_TIMES 1
#define qFatal(...) \
do { \

View File

@ -13,18 +13,18 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "cJSON.h"
#include "clientInt.h"
#include "clientLog.h"
#include "command.h"
#include "scheduler.h"
#include "tdatablock.h"
#include "tdataformat.h"
#include "tdef.h"
#include "tglobal.h"
#include "tmsgtype.h"
#include "tpagedbuf.h"
#include "tref.h"
#include "cJSON.h"
#include "tdataformat.h"
static int32_t initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSet);
static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest);
@ -189,7 +189,8 @@ int32_t parseSql(SRequestObj* pRequest, bool topicQuery, SQuery** pQuery, SStmtC
setResSchemaInfo(&pRequest->body.resInfo, (*pQuery)->pResSchema, (*pQuery)->numOfResCols);
setResPrecision(&pRequest->body.resInfo, (*pQuery)->precision);
}
}
if (TSDB_CODE_SUCCESS == code || NEED_CLIENT_HANDLE_ERROR(code)) {
TSWAP(pRequest->dbList, (*pQuery)->pDbList);
TSWAP(pRequest->tableList, (*pQuery)->pTableList);
}
@ -483,7 +484,8 @@ SRequestObj* execQuery(STscObj* pTscObj, const char* sql, int sqlLen) {
int32_t retryNum = 0;
int32_t code = 0;
while (retryNum++ < REQUEST_MAX_TRY_TIMES) {
do {
destroyRequest(pRequest);
pRequest = launchQuery(pTscObj, sql, sqlLen);
if (pRequest == NULL || TSDB_CODE_SUCCESS == pRequest->code || !NEED_CLIENT_HANDLE_ERROR(pRequest->code)) {
break;
@ -494,9 +496,7 @@ SRequestObj* execQuery(STscObj* pTscObj, const char* sql, int sqlLen) {
pRequest->code = code;
break;
}
destroyRequest(pRequest);
}
} while (retryNum++ < REQUEST_MAX_TRY_TIMES);
return pRequest;
}
@ -808,8 +808,7 @@ static int32_t doPrepareResPtr(SReqResultInfo* pResInfo) {
static char* parseTagDatatoJson(void* p) {
char* string = NULL;
cJSON* json = cJSON_CreateObject();
if (json == NULL)
{
if (json == NULL) {
goto end;
}
@ -837,8 +836,7 @@ static char* parseTagDatatoJson(void *p){
char type = *val;
if (type == TSDB_DATA_TYPE_NULL) {
cJSON* value = cJSON_CreateNull();
if (value == NULL)
{
if (value == NULL) {
goto end;
}
cJSON_AddItemToObject(json, tagJsonKey, value);
@ -854,8 +852,7 @@ static char* parseTagDatatoJson(void *p){
}
value = cJSON_CreateString(tagJsonValue);
taosMemoryFree(tagJsonValue);
if (value == NULL)
{
if (value == NULL) {
goto end;
}
} else if (varDataLen(realData) == 0) {
@ -868,8 +865,7 @@ static char* parseTagDatatoJson(void *p){
} else if (type == TSDB_DATA_TYPE_DOUBLE) {
double jsonVd = *(double*)(realData);
cJSON* value = cJSON_CreateNumber(jsonVd);
if (value == NULL)
{
if (value == NULL) {
goto end;
}
cJSON_AddItemToObject(json, tagJsonKey, value);
@ -884,15 +880,13 @@ static char* parseTagDatatoJson(void *p){
} else if (type == TSDB_DATA_TYPE_BOOL) {
char jsonVd = *(char*)(realData);
cJSON* value = cJSON_CreateBool(jsonVd);
if (value == NULL)
{
if (value == NULL) {
goto end;
}
cJSON_AddItemToObject(json, tagJsonKey, value);
} else {
ASSERT(0);
}
}
string = cJSON_PrintUnformatted(json);
end:
@ -943,7 +937,6 @@ static int32_t doConvertUCS4(SReqResultInfo* pResultInfo, int32_t numOfRows, int
if (pCol->offset[j] != -1) {
char* pStart = pCol->offset[j] + pCol->pData;
int32_t jsonInnerType = *pStart;
char* jsonInnerData = pStart + CHAR_BYTES;
char dst[TSDB_MAX_JSON_TAG_LEN] = {0};

View File

@ -126,7 +126,9 @@ int32_t mmPutRpcMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
return mmPutRpcMsgToWorker(&pMgmt->readWorker, pMsg);
}
int32_t mmPutRpcMsgToSyncQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { return mmPutRpcMsgToWorker(&pMgmt->syncWorker, pMsg); }
int32_t mmPutRpcMsgToSyncQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
return mmPutRpcMsgToWorker(&pMgmt->syncWorker, pMsg);
}
int32_t mmStartWorker(SMnodeMgmt *pMgmt) {
SSingleWorkerCfg qCfg = {

View File

@ -70,12 +70,11 @@ static void vmProcessQueryQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
if (code != 0) {
if (terrno != 0) code = terrno;
vmSendRsp(pMsg, code);
}
dTrace("msg:%p, is freed, code:0x%x", pMsg, code);
rpcFreeCont(pMsg->pCont);
taosFreeQitem(pMsg);
}
}
static void vmProcessFetchQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
SVnodeObj *pVnode = pInfo->ahandle;
@ -85,12 +84,11 @@ static void vmProcessFetchQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
if (code != 0) {
if (terrno != 0) code = terrno;
vmSendRsp(pMsg, code);
}
dTrace("msg:%p, is freed, code:0x%x", pMsg, code);
rpcFreeCont(pMsg->pCont);
taosFreeQitem(pMsg);
}
}
static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
SVnodeObj *pVnode = pInfo->ahandle;
@ -216,13 +214,12 @@ static void vmProcessMergeQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO
if (code != 0) {
if (terrno != 0) code = terrno;
vmSendRsp(pMsg, code);
}
dTrace("msg:%p, is freed, code:0x%x", pMsg, code);
rpcFreeCont(pMsg->pCont);
taosFreeQitem(pMsg);
}
}
}
static int32_t vmPutNodeMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtype) {
SRpcMsg * pRpc = pMsg;

View File

@ -129,7 +129,7 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) {
return 0;
}
int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) {
int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver, SRpcHandleInfo handleInfo) {
if (msgType != TDMT_VND_SUBMIT) return 0;
void* pIter = NULL;
STqExec* pExec = NULL;
@ -239,10 +239,9 @@ int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_
void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
tEncodeSMqDataBlkRsp(&abuf, &rsp);
pMsg->pCont = buf;
pMsg->contLen = tlen;
pMsg->code = 0;
tmsgSendRsp(pMsg);
SRpcMsg resp = {.info = handleInfo, .pCont = buf, .contLen = tlen, .code = 0};
tmsgSendRsp(&resp);
atomic_store_ptr(&pExec->pushHandle.handle, NULL);
taosWUnLockLatch(&pExec->pushHandle.lock);
@ -663,10 +662,9 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
tEncodeSMqDataBlkRsp(&abuf, &rsp);
pMsg->pCont = buf;
pMsg->contLen = tlen;
pMsg->code = 0;
tmsgSendRsp(pMsg);
SRpcMsg resp = {.info = pMsg->info, .pCont = buf, .contLen = tlen, .code = 0};
tmsgSendRsp(&resp);
tqDebug("vg %d offset %ld from consumer %ld (epoch %d) send rsp, block num: %d, reqOffset: %ld, rspOffset: %ld",
TD_VID(pTq->pVnode), fetchOffset, consumerId, pReq->epoch, rsp.blockNum, rsp.reqOffset, rsp.rspOffset);
@ -845,12 +843,10 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
/*rsp.pBlockData = pRes;*/
/*taosArrayDestroyEx(rsp.pBlockData, (void (*)(void*))tDeleteSSDataBlock);*/
pMsg->pCont = buf;
pMsg->contLen = msgLen;
pMsg->code = 0;
SRpcMsg resp = {.info = pMsg->info, pCont = buf, .contLen = msgLen, .code = 0};
tqDebug("vg %d offset %ld msgType %d from consumer %ld (epoch %d) actual rsp", TD_VID(pTq->pVnode), fetchOffset,
pHead->msgType, consumerId, pReq->epoch);
tmsgSendRsp(pMsg);
tmsgSendRsp(&resp);
taosMemoryFree(pHead);
return 0;
} else {
@ -878,10 +874,9 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
tEncodeSMqPollRspV2(&abuf, &rspV2);
pMsg->pCont = buf;
pMsg->contLen = tlen;
pMsg->code = 0;
tmsgSendRsp(pMsg);
SRpcMsg resp = {.info = pMsg->info, .pCont = buf, .contLen = tlen, .code = 0};
tmsgSendRsp(&resp);
tqDebug("vg %d offset %ld from consumer %ld (epoch %d) not rsp", TD_VID(pTq->pVnode), fetchOffset, consumerId,
pReq->epoch);
/*}*/

View File

@ -184,8 +184,12 @@ int vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
case TDMT_VND_TASK_PIPE_EXEC:
case TDMT_VND_TASK_MERGE_EXEC:
return tqProcessTaskExec(pVnode->pTq, msgstr, msgLen, 0);
case TDMT_VND_STREAM_TRIGGER:
return tqProcessStreamTrigger(pVnode->pTq, pMsg->pCont, pMsg->contLen, 0);
case TDMT_VND_STREAM_TRIGGER: {
// refactor, avoid double free
int code = tqProcessStreamTrigger(pVnode->pTq, pMsg->pCont, pMsg->contLen, 0);
pMsg->pCont = NULL;
return code;
}
case TDMT_VND_QUERY_HEARTBEAT:
return qWorkerProcessHbMsg(pVnode, pVnode->pQuery, pMsg);
default:

View File

@ -41,6 +41,13 @@
sToken = tStrGetToken(pSql, &index, false); \
} while (0)
#define NEXT_VALID_TOKEN(pSql, sToken) \
do { \
sToken.n = tGetToken(pSql, &sToken.type); \
sToken.z = pSql; \
pSql += sToken.n; \
} while (TK_NK_SPACE == sToken.type)
typedef struct SInsertParseContext {
SParseContext* pComCxt; // input
char* pSql; // input
@ -482,9 +489,11 @@ static int32_t parseValueToken(char** end, SToken* pToken, SSchema* pSchema, int
return buildSyntaxErrMsg(pMsgBuf, "invalid bool data", pToken->z);
}
} else if (pToken->type == TK_NK_INTEGER) {
return func(pMsgBuf, ((taosStr2Int64(pToken->z, NULL, 10) == 0) ? &FALSE_VALUE : &TRUE_VALUE), pSchema->bytes, param);
return func(pMsgBuf, ((taosStr2Int64(pToken->z, NULL, 10) == 0) ? &FALSE_VALUE : &TRUE_VALUE), pSchema->bytes,
param);
} else if (pToken->type == TK_NK_FLOAT) {
return func(pMsgBuf, ((taosStr2Double(pToken->z, NULL) == 0) ? &FALSE_VALUE : &TRUE_VALUE), pSchema->bytes, param);
return func(pMsgBuf, ((taosStr2Double(pToken->z, NULL) == 0) ? &FALSE_VALUE : &TRUE_VALUE), pSchema->bytes,
param);
} else {
return buildSyntaxErrMsg(pMsgBuf, "invalid bool data", pToken->z);
}
@ -685,7 +694,7 @@ static int32_t parseBoundColumns(SInsertParseContext* pCxt, SParsedDataColInfo*
isOrdered = false;
}
if (index < 0) {
return buildSyntaxErrMsg(&pCxt->msg, "invalid column/tag name", sToken.z);
return generateSyntaxErrMsg(&pCxt->msg, TSDB_CODE_PAR_INVALID_COLUMN, sToken.z);
}
if (pColList->cols[index].valStat == VAL_STAT_HAS) {
return buildSyntaxErrMsg(&pCxt->msg, "duplicated column name", sToken.z);
@ -895,8 +904,10 @@ static int32_t parseUsingClause(SInsertParseContext* pCxt, SName* name, char* tb
return buildSyntaxErrMsg(&pCxt->msg, "( is expected", sToken.z);
}
CHECK_CODE(parseTagsClause(pCxt, pCxt->pTableMeta->schema, getTableInfo(pCxt->pTableMeta).precision, name->tname));
NEXT_TOKEN(pCxt->pSql, sToken);
if (TK_NK_RP != sToken.type) {
NEXT_VALID_TOKEN(pCxt->pSql, sToken);
if (TK_NK_COMMA == sToken.type) {
return generateSyntaxErrMsg(&pCxt->msg, TSDB_CODE_PAR_TAGS_NOT_MATCHED);
} else if (TK_NK_RP != sToken.type) {
return buildSyntaxErrMsg(&pCxt->msg, ") is expected", sToken.z);
}
@ -996,8 +1007,10 @@ static int32_t parseValues(SInsertParseContext* pCxt, STableDataBlocks* pDataBlo
pDataBlock->size += extendedRowSize; // len;
}
NEXT_TOKEN(pCxt->pSql, sToken);
if (TK_NK_RP != sToken.type) {
NEXT_VALID_TOKEN(pCxt->pSql, sToken);
if (TK_NK_COMMA == sToken.type) {
return generateSyntaxErrMsg(&pCxt->msg, TSDB_CODE_PAR_INVALID_COLUMNS_NUM);
} else if (TK_NK_RP != sToken.type) {
return buildSyntaxErrMsg(&pCxt->msg, ") expected", sToken.z);
}
@ -1160,7 +1173,8 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
memcpy(tags, &pCxt->tags, sizeof(pCxt->tags));
(*pCxt->pStmtCb->setInfoFn)(pCxt->pStmtCb->pStmt, pMeta, tags, tbFName, autoCreateTbl, pCxt->pVgroupsHashObj, pCxt->pTableBlockHashObj);
(*pCxt->pStmtCb->setInfoFn)(pCxt->pStmtCb->pStmt, pMeta, tags, tbFName, autoCreateTbl, pCxt->pVgroupsHashObj,
pCxt->pTableBlockHashObj);
memset(&pCxt->tags, 0, sizeof(pCxt->tags));
pCxt->pVgroupsHashObj = NULL;
@ -1238,7 +1252,7 @@ int32_t parseInsertSql(SParseContext* pContext, SQuery** pQuery) {
if (TSDB_CODE_SUCCESS == code) {
code = parseInsertBody(&context);
}
if (TSDB_CODE_SUCCESS == code) {
if (TSDB_CODE_SUCCESS == code || NEED_CLIENT_HANDLE_ERROR(code)) {
SName* pTable = taosHashIterate(context.pTableNameHashObj, NULL);
while (NULL != pTable) {
taosArrayPush((*pQuery)->pTableList, pTable);
@ -1688,8 +1702,8 @@ static int32_t smlBuildTagRow(SArray* cols, SKVRowBuilder* tagsBuilder, SParsedD
return TSDB_CODE_SUCCESS;
}
int32_t smlBindData(void *handle, SArray *tags, SArray *colsSchema, SArray *cols, bool format,
STableMeta *pTableMeta, char *tableName, char *msgBuf, int16_t msgBufLen) {
int32_t smlBindData(void* handle, SArray* tags, SArray* colsSchema, SArray* cols, bool format, STableMeta* pTableMeta,
char* tableName, char* msgBuf, int16_t msgBufLen) {
SMsgBuf pBuf = {.buf = msgBuf, .len = msgBufLen};
SSmlExecHandle* smlHandle = (SSmlExecHandle*)handle;
@ -1702,7 +1716,8 @@ int32_t smlBindData(void *handle, SArray *tags, SArray *colsSchema, SArray *cols
return ret;
}
SKVRow row = NULL;
ret = smlBuildTagRow(tags, &smlHandle->tableExecHandle.tagsBuilder, &smlHandle->tableExecHandle.tags, pTagsSchema, &row, &pBuf);
ret = smlBuildTagRow(tags, &smlHandle->tableExecHandle.tagsBuilder, &smlHandle->tableExecHandle.tags, pTagsSchema,
&row, &pBuf);
if (ret != TSDB_CODE_SUCCESS) {
return ret;
}

View File

@ -120,6 +120,20 @@ static int32_t getTableMeta(STranslateContext* pCxt, const char* pDbName, const
return getTableMetaImpl(pCxt, toName(pCxt->pParseCxt->acctId, pDbName, pTableName, &name), pMeta);
}
static int32_t refreshGetTableMeta(STranslateContext* pCxt, const char* pDbName, const char* pTableName,
STableMeta** pMeta) {
SParseContext* pParCxt = pCxt->pParseCxt;
SName name;
toName(pCxt->pParseCxt->acctId, pDbName, pTableName, &name);
int32_t code =
catalogRefreshGetTableMeta(pParCxt->pCatalog, pParCxt->pTransporter, &pParCxt->mgmtEpSet, &name, pMeta, false);
if (TSDB_CODE_SUCCESS != code) {
parserError("catalogRefreshGetTableMeta error, code:%s, dbName:%s, tbName:%s", tstrerror(code), pDbName,
pTableName);
}
return code;
}
static int32_t getTableDistVgInfo(STranslateContext* pCxt, const SName* pName, SArray** pVgInfo) {
SParseContext* pParCxt = pCxt->pParseCxt;
int32_t code = collectUseDatabase(pName, pCxt->pDbs);
@ -3201,7 +3215,7 @@ static int32_t translateExplain(STranslateContext* pCxt, SExplainStmt* pStmt) {
}
static int32_t translateDescribe(STranslateContext* pCxt, SDescribeStmt* pStmt) {
return getTableMeta(pCxt, pStmt->dbName, pStmt->tableName, &pStmt->pMeta);
return refreshGetTableMeta(pCxt, pStmt->dbName, pStmt->tableName, &pStmt->pMeta);
}
static int32_t translateKillConnection(STranslateContext* pCxt, SKillStmt* pStmt) {

View File

@ -94,7 +94,9 @@ void rpcFreeCont(void* cont) {
if (cont == NULL) {
return;
}
taosMemoryFree((char*)cont - TRANS_MSG_OVERHEAD);
tTrace("free mem: %p", (char*)cont - TRANS_MSG_OVERHEAD);
}
void* rpcReallocCont(void* ptr, int contLen) {
if (ptr == NULL) {

View File

@ -133,6 +133,7 @@ int transAllocBuffer(SConnBuffer* connBuf, uv_buf_t* uvBuf) {
} else {
p->cap = p->total;
p->buf = taosMemoryRealloc(p->buf, p->cap);
tTrace("internal malloc mem: %p, size: %d", p->buf, p->cap);
uvBuf->base = p->buf + p->len;
uvBuf->len = p->cap - p->len;

View File

@ -469,6 +469,8 @@ static void uvStartSendResp(SSrvMsg* smsg) {
if (pConn->broken == true) {
// persist by
transFreeMsg(smsg->msg.pCont);
taosMemoryFree(smsg);
transUnrefSrvHandle(pConn);
return;
}