From 08f6856cc593449986416178b1c2c7a3ed9f60c6 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Mon, 25 Jul 2022 11:32:12 +0000 Subject: [PATCH 1/8] optimize parser insert --- source/client/inc/clientInt.h | 8 ++++---- source/client/src/clientImpl.c | 8 ++++---- source/libs/parser/src/parInsert.c | 14 ++++++++------ 3 files changed, 16 insertions(+), 14 deletions(-) diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index 779fa68140..bab1852351 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -46,7 +46,7 @@ extern "C" { #define ERROR_MSG_BUF_DEFAULT_SIZE 512 #define HEARTBEAT_INTERVAL 1500 // ms -#define SYNC_ON_TOP_OF_ASYNC 1 +#define SYNC_ON_TOP_OF_ASYNC 0 enum { RES_TYPE__QUERY = 1, @@ -224,9 +224,9 @@ typedef struct SRequestObj { SArray* tableList; SQueryExecMetric metric; SRequestSendRecvBody body; - bool syncQuery; // todo refactor: async query object - bool stableQuery; // todo refactor - bool validateOnly; // todo refactor + bool syncQuery; // todo refactor: async query object + bool stableQuery; // todo refactor + bool validateOnly; // todo refactor bool killed; uint32_t prevCode; // previous error code: todo refactor, add update flag for catalog uint32_t retry; diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 03238e6747..198f751c49 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -590,7 +590,7 @@ int32_t buildAsyncExecNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray return code; } -void freeVgList(void *list) { +void freeVgList(void* list) { SArray* pList = *(SArray**)list; taosArrayDestroy(pList); } @@ -1277,8 +1277,8 @@ int32_t doProcessMsgFromServer(void* param) { char tbuf[40] = {0}; TRACE_TO_STR(trace, tbuf); - tscDebug("processMsgFromServer handle %p, message: %s, code: %s, gtid: %s", pMsg->info.handle, TMSG_INFO(pMsg->msgType), tstrerror(pMsg->code), - tbuf); + tscDebug("processMsgFromServer handle %p, message: %s, code: %s, gtid: %s", pMsg->info.handle, + TMSG_INFO(pMsg->msgType), tstrerror(pMsg->code), tbuf); if (pSendInfo->requestObjRefId != 0) { SRequestObj* pRequest = (SRequestObj*)taosAcquireRef(clientReqRefPool, pSendInfo->requestObjRefId); @@ -2113,7 +2113,7 @@ TAOS_RES* taosQueryImpl(TAOS* taos, const char* sql, bool validateOnly) { return NULL; } - TAOS_RES* pRes = execQuery(connId, sql, sqlLen, validateOnly); + TAOS_RES* pRes = execQuery(*(int64_t*)taos, sql, sqlLen, validateOnly); return pRes; #endif } diff --git a/source/libs/parser/src/parInsert.c b/source/libs/parser/src/parInsert.c index 702422e022..d564d53633 100644 --- a/source/libs/parser/src/parInsert.c +++ b/source/libs/parser/src/parInsert.c @@ -739,12 +739,13 @@ static int32_t parseBoundColumns(SInsertParseContext* pCxt, SParsedDataColInfo* return TSDB_CODE_SUCCESS; } -static void buildCreateTbReq(SVCreateTbReq* pTbReq, const char* tname, STag* pTag, int64_t suid, const char* sname, SArray* tagName, uint8_t tagNum) { +static void buildCreateTbReq(SVCreateTbReq* pTbReq, const char* tname, STag* pTag, int64_t suid, const char* sname, + SArray* tagName, uint8_t tagNum) { pTbReq->type = TD_CHILD_TABLE; pTbReq->name = strdup(tname); pTbReq->ctb.suid = suid; pTbReq->ctb.tagNum = tagNum; - if(sname) pTbReq->ctb.name = strdup(sname); + if (sname) pTbReq->ctb.name = strdup(sname); pTbReq->ctb.pTag = (uint8_t*)pTag; pTbReq->ctb.tagName = taosArrayDup(tagName); pTbReq->commentLen = -1; @@ -969,7 +970,7 @@ static int32_t parseTagsClause(SInsertParseContext* pCxt, SSchema* pSchema, uint } SSchema* pTagSchema = &pSchema[pCxt->tags.boundColumns[i]]; - char tmpTokenBuf[TSDB_MAX_BYTES_PER_ROW] = {0}; // todo this can be optimize with parse column + char tmpTokenBuf[TSDB_MAX_BYTES_PER_ROW] = {0}; // todo this can be optimize with parse column code = checkAndTrimValue(&sToken, tmpTokenBuf, &pCxt->msg); if (code != TSDB_CODE_SUCCESS) { goto end; @@ -1012,7 +1013,8 @@ static int32_t parseTagsClause(SInsertParseContext* pCxt, SSchema* pSchema, uint goto end; } - buildCreateTbReq(&pCxt->createTblReq, tName, pTag, pCxt->pTableMeta->suid, pCxt->sTableName, tagName, pCxt->pTableMeta->tableInfo.numOfTags); + buildCreateTbReq(&pCxt->createTblReq, tName, pTag, pCxt->pTableMeta->suid, pCxt->sTableName, tagName, + pCxt->pTableMeta->tableInfo.numOfTags); end: for (int i = 0; i < taosArrayGetSize(pTagVals); ++i) { @@ -1650,7 +1652,6 @@ static int32_t skipUsingClause(SInsertParseSyntaxCxt* pCxt) { static int32_t collectTableMetaKey(SInsertParseSyntaxCxt* pCxt, SToken* pTbToken) { SName name; CHECK_CODE(createSName(&name, pTbToken, pCxt->pComCxt->acctId, pCxt->pComCxt->db, &pCxt->msg)); - CHECK_CODE(reserveDbCfgInCache(pCxt->pComCxt->acctId, name.dbname, pCxt->pMetaCache)); CHECK_CODE(reserveUserAuthInCacheExt(pCxt->pComCxt->pUser, &name, AUTH_TYPE_WRITE, pCxt->pMetaCache)); CHECK_CODE(reserveTableMetaInCacheExt(&name, pCxt->pMetaCache)); CHECK_CODE(reserveTableVgroupInCacheExt(&name, pCxt->pMetaCache)); @@ -2332,7 +2333,8 @@ int32_t smlBindData(void* handle, SArray* tags, SArray* colsSchema, SArray* cols return ret; } - buildCreateTbReq(&smlHandle->tableExecHandle.createTblReq, tableName, pTag, pTableMeta->suid, NULL, tagName, pTableMeta->tableInfo.numOfTags); + buildCreateTbReq(&smlHandle->tableExecHandle.createTblReq, tableName, pTag, pTableMeta->suid, NULL, tagName, + pTableMeta->tableInfo.numOfTags); taosArrayDestroy(tagName); smlHandle->tableExecHandle.createTblReq.ctb.name = taosMemoryMalloc(sTableNameLen + 1); From d2ce48f50111a336f129fc49eb8d1e7e16eaf756 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Tue, 26 Jul 2022 05:49:37 +0000 Subject: [PATCH 2/8] more --- source/libs/scheduler/src/schRemote.c | 25 ++++++++++++++++++++----- 1 file changed, 20 insertions(+), 5 deletions(-) diff --git a/source/libs/scheduler/src/schRemote.c b/source/libs/scheduler/src/schRemote.c index b794cb91f5..6633b9c21b 100644 --- a/source/libs/scheduler/src/schRemote.c +++ b/source/libs/scheduler/src/schRemote.c @@ -512,7 +512,7 @@ int32_t schGenerateCallBackInfo(SSchJob *pJob, SSchTask *pTask, void *msg, uint3 SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); } - msgSendInfo->paramFreeFp = taosMemoryFree; + msgSendInfo->paramFreeFp = taosMemoryFree; SCH_ERR_JRET(schMakeCallbackParam(pJob, pTask, msgType, isHb, trans, &msgSendInfo->param)); SCH_ERR_JRET(schGetCallbackFp(msgType, &msgSendInfo->fp)); @@ -540,7 +540,7 @@ _return: } taosMemoryFree(msg); - + SCH_RET(code); } @@ -680,7 +680,7 @@ int32_t schMakeHbRpcCtx(SSchJob *pJob, SSchTask *pTask, SRpcCtx *pCtx) { param->pTrans = pJob->conn.pTrans; pMsgSendInfo->param = param; - pMsgSendInfo->paramFreeFp = taosMemoryFree; + pMsgSendInfo->paramFreeFp = taosMemoryFree; pMsgSendInfo->fp = fp; SRpcCtxVal ctxVal = {.val = pMsgSendInfo, .clone = schCloneSMsgSendInfo}; @@ -800,7 +800,7 @@ int32_t schCloneSMsgSendInfo(void *src, void **dst) { pDst->param = NULL; SCH_ERR_JRET(schCloneCallbackParam(pSrc->param, (SSchCallbackParamHeader **)&pDst->param)); - pDst->paramFreeFp = taosMemoryFree; + pDst->paramFreeFp = taosMemoryFree; *dst = pDst; @@ -1093,14 +1093,29 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, break; } +#if 1 SSchTrans trans = {.pTrans = pJob->conn.pTrans, .pHandle = SCH_GET_TASK_HANDLE(pTask)}; schAsyncSendMsg(pJob, pTask, &trans, addr, msgType, msg, msgSize, persistHandle, (rpcCtx.args ? &rpcCtx : NULL)); msg = NULL; SCH_ERR_JRET(code); - + if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY) { SCH_ERR_RET(schAppendTaskExecNode(pJob, pTask, addr, pTask->execId)); } +#else + if (TDMT_VND_SUBMIT != msgType) { + SSchTrans trans = {.pTrans = pJob->conn.pTrans, .pHandle = SCH_GET_TASK_HANDLE(pTask)}; + schAsyncSendMsg(pJob, pTask, &trans, addr, msgType, msg, msgSize, persistHandle, (rpcCtx.args ? &rpcCtx : NULL)); + msg = NULL; + SCH_ERR_JRET(code); + + if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY) { + SCH_ERR_RET(schAppendTaskExecNode(pJob, pTask, addr, pTask->execId)); + } + } else { + SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask)); + } +#endif return TSDB_CODE_SUCCESS; From 1840d8d81235c2a777ecfc36c04314a90290016e Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Thu, 28 Jul 2022 06:35:56 +0000 Subject: [PATCH 3/8] fix: replica coredump debug --- source/dnode/vnode/src/tsdb/tsdbMemTable.c | 3 ++- source/dnode/vnode/src/tsdb/tsdbSnapshot.c | 2 +- source/dnode/vnode/src/vnd/vnodeCommit.c | 2 +- source/dnode/vnode/src/vnd/vnodeSnapshot.c | 5 ++++- 4 files changed, 8 insertions(+), 4 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbMemTable.c b/source/dnode/vnode/src/tsdb/tsdbMemTable.c index fa775bb882..50d7de3e11 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMemTable.c +++ b/source/dnode/vnode/src/tsdb/tsdbMemTable.c @@ -374,11 +374,12 @@ static int32_t tsdbGetOrCreateTbData(SMemTable *pMemTable, tb_uid_t suid, tb_uid p = taosArrayInsert(pMemTable->aTbData, idx, &pTbData); taosWUnLockLatch(&pMemTable->latch); + tsdbDebug("vgId:%d add table data %p at idx:%d", TD_VID(pMemTable->pTsdb->pVnode), pTbData, idx); + if (p == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto _err; } - _exit: *ppTbData = pTbData; return code; diff --git a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c index 43537c9a8d..2e2b7a7571 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c +++ b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c @@ -833,7 +833,7 @@ static int32_t tsdbSnapWriteDataEnd(STsdbSnapWriter* pWriter) { } _exit: - tsdbError("vgId:%d vnode snapshot tsdb writer data end", TD_VID(pTsdb->pVnode)); + tsdbInfo("vgId:%d vnode snapshot tsdb writer data end", TD_VID(pTsdb->pVnode)); return code; _err: diff --git a/source/dnode/vnode/src/vnd/vnodeCommit.c b/source/dnode/vnode/src/vnd/vnodeCommit.c index 1207df8058..e8d029a88f 100644 --- a/source/dnode/vnode/src/vnd/vnodeCommit.c +++ b/source/dnode/vnode/src/vnd/vnodeCommit.c @@ -288,7 +288,7 @@ int vnodeCommit(SVnode *pVnode) { // apply the commit (TODO) walEndSnapshot(pVnode->pWal); - vInfo("vgId:%d, commit over", TD_VID(pVnode)); + vInfo("vgId:%d, commit end", TD_VID(pVnode)); return 0; } diff --git a/source/dnode/vnode/src/vnd/vnodeSnapshot.c b/source/dnode/vnode/src/vnd/vnodeSnapshot.c index 7241ca4971..ec03801acd 100644 --- a/source/dnode/vnode/src/vnd/vnodeSnapshot.c +++ b/source/dnode/vnode/src/vnd/vnodeSnapshot.c @@ -180,6 +180,7 @@ struct SVSnapWriter { SVnode *pVnode; int64_t sver; int64_t ever; + int64_t commitID; int64_t index; // meta SMetaSnapWriter *pMetaSnapWriter; @@ -200,8 +201,10 @@ int32_t vnodeSnapWriterOpen(SVnode *pVnode, int64_t sver, int64_t ever, SVSnapWr pWriter->pVnode = pVnode; pWriter->sver = sver; pWriter->ever = ever; + pWriter->commitID = pVnode->state.commitID; - vInfo("vgId:%d vnode snapshot writer opened", TD_VID(pVnode)); + vInfo("vgId:%d vnode snapshot writer opened, sver:%" PRId64 " ever:%" PRId64 " commit id:%" PRId64, TD_VID(pVnode), + sver, ever, pWriter->commitID); *ppWriter = pWriter; return code; From 49c6ba55c487ac92e339e85ddd2fb436ee3b9ef2 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Thu, 28 Jul 2022 06:56:20 +0000 Subject: [PATCH 4/8] fix: vnode snapshot problem --- source/dnode/vnode/src/vnd/vnodeSnapshot.c | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/vnd/vnodeSnapshot.c b/source/dnode/vnode/src/vnd/vnodeSnapshot.c index ec03801acd..b256ef20fe 100644 --- a/source/dnode/vnode/src/vnd/vnodeSnapshot.c +++ b/source/dnode/vnode/src/vnd/vnodeSnapshot.c @@ -201,7 +201,14 @@ int32_t vnodeSnapWriterOpen(SVnode *pVnode, int64_t sver, int64_t ever, SVSnapWr pWriter->pVnode = pVnode; pWriter->sver = sver; pWriter->ever = ever; - pWriter->commitID = pVnode->state.commitID; + + // commit it + code = vnodeCommit(pVnode); + if (code) goto _err; + + // inc commit ID + pVnode->state.commitID++; + pWriter->commitID; vInfo("vgId:%d vnode snapshot writer opened, sver:%" PRId64 " ever:%" PRId64 " commit id:%" PRId64, TD_VID(pVnode), sver, ever, pWriter->commitID); @@ -247,6 +254,8 @@ int32_t vnodeSnapWriterClose(SVSnapWriter *pWriter, int8_t rollback, SSnapshot * code = vnodeCommitInfo(dir, &info); if (code) goto _err; + + vnodeBegin(pVnode); } else { ASSERT(0); } From a3245a1e26ed40de1c9e6b3b2a060f38c5a72536 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Thu, 28 Jul 2022 07:03:46 +0000 Subject: [PATCH 5/8] fix: another replica coredump --- source/dnode/vnode/src/tsdb/tsdbSnapshot.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c index 2e2b7a7571..44481299b8 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c +++ b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c @@ -756,7 +756,7 @@ static int32_t tsdbSnapWriteTableData(STsdbSnapWriter* pWriter, TABLEID id) { if (pWriter->iBlockIdx < taosArrayGetSize(pWriter->aBlockIdx)) { ASSERT(pWriter->pDataFReader); - SBlockIdx* pBlockIdx = (SBlockIdx*)taosArrayGet(pWriter->aBlockIdx, pWriter->iBlock); + SBlockIdx* pBlockIdx = (SBlockIdx*)taosArrayGet(pWriter->aBlockIdx, pWriter->iBlockIdx); int32_t c = tTABLEIDCmprFn(pBlockIdx, &id); ASSERT(c >= 0); From 8b3de3a216173948371589b01c40eb3ac70d5b79 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Thu, 28 Jul 2022 09:27:45 +0000 Subject: [PATCH 6/8] make pass CI --- source/client/inc/clientInt.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index fc1242f951..2831e7a904 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -46,7 +46,7 @@ extern "C" { #define ERROR_MSG_BUF_DEFAULT_SIZE 512 #define HEARTBEAT_INTERVAL 1500 // ms -#define SYNC_ON_TOP_OF_ASYNC 0 +#define SYNC_ON_TOP_OF_ASYNC 1 enum { RES_TYPE__QUERY = 1, From a1120d4c20ef7111fe48c652d91d3f9721bf94e4 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Thu, 28 Jul 2022 10:49:07 +0000 Subject: [PATCH 7/8] fix: compile error --- source/dnode/vnode/src/vnd/vnodeSnapshot.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/vnd/vnodeSnapshot.c b/source/dnode/vnode/src/vnd/vnodeSnapshot.c index b256ef20fe..3f8f81cb09 100644 --- a/source/dnode/vnode/src/vnd/vnodeSnapshot.c +++ b/source/dnode/vnode/src/vnd/vnodeSnapshot.c @@ -208,7 +208,7 @@ int32_t vnodeSnapWriterOpen(SVnode *pVnode, int64_t sver, int64_t ever, SVSnapWr // inc commit ID pVnode->state.commitID++; - pWriter->commitID; + pWriter->commitID = pVnode->state.commitID; vInfo("vgId:%d vnode snapshot writer opened, sver:%" PRId64 " ever:%" PRId64 " commit id:%" PRId64, TD_VID(pVnode), sver, ever, pWriter->commitID); From 9c87967a116fa4ec2e71feb64972b842757a7d7a Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Fri, 29 Jul 2022 02:45:41 +0000 Subject: [PATCH 8/8] fix: another replica coredump --- source/dnode/vnode/src/tsdb/tsdbSnapshot.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c index 44481299b8..51d7edcf71 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c +++ b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c @@ -694,8 +694,8 @@ static int32_t tsdbSnapWriteTableDataImpl(STsdbSnapWriter* pWriter) { if (pWriter->bDataW.nRow < pWriter->maxRow * 4 / 5) continue; _write_block: - code = tsdbWriteBlockData(pWriter->pDataFWriter, &pWriter->bDataW, NULL, NULL, pWriter->pBlockIdx, &pWriter->blockW, - pWriter->cmprAlg); + code = tsdbWriteBlockData(pWriter->pDataFWriter, &pWriter->bDataW, NULL, NULL, pWriter->pBlockIdxW, + &pWriter->blockW, pWriter->cmprAlg); if (code) goto _err; code = tMapDataPutItem(&pWriter->mBlockW, &pWriter->blockW, tPutBlock);