From d927e8c31a35f719c66c9bab0f4887839b8fe132 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Fri, 20 Dec 2024 16:50:49 +0800 Subject: [PATCH 1/8] fix:[TD-33272]refactor code --- source/dnode/vnode/src/tq/tqOffset.c | 29 +- source/dnode/vnode/src/tq/tqSnapshot.c | 226 +++++++------- source/libs/parser/src/parInsertSml.c | 391 +++++++++---------------- source/os/src/osString.c | 2 - 4 files changed, 260 insertions(+), 388 deletions(-) diff --git a/source/dnode/vnode/src/tq/tqOffset.c b/source/dnode/vnode/src/tq/tqOffset.c index c42959971b..4d90091701 100644 --- a/source/dnode/vnode/src/tq/tqOffset.c +++ b/source/dnode/vnode/src/tq/tqOffset.c @@ -40,13 +40,11 @@ int32_t tqOffsetRestoreFromFile(STQ* pTq, char* name) { return TSDB_CODE_INVALID_MSG; } int32_t code = TDB_CODE_SUCCESS; + int32_t lino = 0; void* pMemBuf = NULL; TdFilePtr pFile = taosOpenFile(name, TD_FILE_READ); - if (pFile == NULL) { - code = TDB_CODE_SUCCESS; - goto END; - } + TSDB_CHECK_NULL(pFile, code, lino, END, TDB_CODE_SUCCESS); int64_t ret = 0; int32_t size = 0; @@ -60,24 +58,16 @@ int32_t tqOffsetRestoreFromFile(STQ* pTq, char* name) { } total += INT_BYTES; size = htonl(size); - if (size <= 0) { - code = TSDB_CODE_INVALID_MSG; - goto END; - } - pMemBuf = taosMemoryCalloc(1, size); - if (pMemBuf == NULL) { - code = terrno; - goto END; - } + TSDB_CHECK_CONDITION(size > 0, code, lino, END, TSDB_CODE_INVALID_MSG); - if (taosReadFile(pFile, pMemBuf, size) != size) { - terrno = TSDB_CODE_INVALID_MSG; - goto END; - } + pMemBuf = taosMemoryCalloc(1, size); + TSDB_CHECK_NULL(pMemBuf, code, lino, END, terrno); + TSDB_CHECK_CONDITION(taosReadFile(pFile, pMemBuf, size) == size, code, lino, END, TSDB_CODE_INVALID_MSG); total += size; STqOffset offset = {0}; - TQ_ERR_GO_TO_END(tqMetaDecodeOffsetInfo(&offset, pMemBuf, size)); + code = tqMetaDecodeOffsetInfo(&offset, pMemBuf, size); + TSDB_CHECK_CODE(code, lino, END); code = taosHashPut(pTq->pOffset, offset.subKey, strlen(offset.subKey), &offset, sizeof(STqOffset)); if (code != TDB_CODE_SUCCESS) { tDeleteSTqOffset(&offset); @@ -100,6 +90,9 @@ int32_t tqOffsetRestoreFromFile(STQ* pTq, char* name) { } END: + if (code != 0){ + tqError("%s failed at %d since %s", __func__, lino, tstrerror(code)); + } taosCloseFile(&pFile); taosMemoryFree(pMemBuf); diff --git a/source/dnode/vnode/src/tq/tqSnapshot.c b/source/dnode/vnode/src/tq/tqSnapshot.c index 219ea4b6b4..ddbbba57a0 100644 --- a/source/dnode/vnode/src/tq/tqSnapshot.c +++ b/source/dnode/vnode/src/tq/tqSnapshot.c @@ -27,18 +27,16 @@ struct STqSnapReader { }; int32_t tqSnapReaderOpen(STQ* pTq, int64_t sver, int64_t ever, int8_t type, STqSnapReader** ppReader) { - if (pTq == NULL || ppReader == NULL) { - return TSDB_CODE_INVALID_MSG; - } - int32_t code = 0; + int code = TSDB_CODE_SUCCESS; + int32_t lino = 0; STqSnapReader* pReader = NULL; + TSDB_CHECK_NULL(pTq, code, lino, end, TSDB_CODE_INVALID_MSG); + TSDB_CHECK_NULL(ppReader, code, lino, end, TSDB_CODE_INVALID_MSG); // alloc pReader = (STqSnapReader*)taosMemoryCalloc(1, sizeof(STqSnapReader)); - if (pReader == NULL) { - code = terrno; - goto _err; - } + TSDB_CHECK_NULL(pReader, code, lino, end, terrno); + pReader->pTq = pTq; pReader->sver = sver; pReader->ever = ever; @@ -54,28 +52,21 @@ int32_t tqSnapReaderOpen(STQ* pTq, int64_t sver, int64_t ever, int8_t type, STqS pTb = pTq->pOffsetStore; } else { code = TSDB_CODE_INVALID_MSG; - goto _err; + goto end; } code = tdbTbcOpen(pTb, &pReader->pCur, NULL); - if (code) { - taosMemoryFree(pReader); - goto _err; - } - + TSDB_CHECK_CODE(code, lino, end); code = tdbTbcMoveToFirst(pReader->pCur); - if (code) { - taosMemoryFree(pReader); - goto _err; + TSDB_CHECK_CODE(code, lino, end); + tqInfo("vgId:%d, vnode tq snapshot reader opene success", TD_VID(pTq->pVnode)); + *ppReader = pReader; + +end: + if (code != 0){ + tqError("%s failed at %d, vnode tq snapshot reader open failed since %s", __func__, lino, tstrerror(code)); + taosMemoryFreeClear(pReader); } - tqInfo("vgId:%d, vnode snapshot tq reader opened", TD_VID(pTq->pVnode)); - - *ppReader = pReader; - return code; - -_err: - tqError("vgId:%d, vnode snapshot tq reader open failed since %s", TD_VID(pTq->pVnode), tstrerror(code)); - *ppReader = NULL; return code; } @@ -84,45 +75,37 @@ void tqSnapReaderClose(STqSnapReader** ppReader) { return; } tdbTbcClose((*ppReader)->pCur); - taosMemoryFree(*ppReader); - *ppReader = NULL; + taosMemoryFreeClear(*ppReader); } int32_t tqSnapRead(STqSnapReader* pReader, uint8_t** ppData) { - if (pReader == NULL || ppData == NULL) { - return TSDB_CODE_INVALID_MSG; - } - int32_t code = 0; - void* pKey = NULL; - void* pVal = NULL; - int32_t kLen = 0; - int32_t vLen = 0; + int code = TSDB_CODE_SUCCESS; + int32_t lino = 0; + void* pKey = NULL; + void* pVal = NULL; + int32_t kLen = 0; + int32_t vLen = 0; + TSDB_CHECK_NULL(pReader, code, lino, end, TSDB_CODE_INVALID_MSG); + TSDB_CHECK_NULL(ppData, code, lino, end, TSDB_CODE_INVALID_MSG); - if (tdbTbcNext(pReader->pCur, &pKey, &kLen, &pVal, &vLen)) { - goto _exit; - } + code = tdbTbcNext(pReader->pCur, &pKey, &kLen, &pVal, &vLen); + TSDB_CHECK_CODE(code, lino, end); *ppData = taosMemoryMalloc(sizeof(SSnapDataHdr) + vLen); - if (*ppData == NULL) { - code = TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY); - goto _err; - } + TSDB_CHECK_NULL(*ppData, code, lino, end, terrno); SSnapDataHdr* pHdr = (SSnapDataHdr*)(*ppData); pHdr->type = pReader->type; pHdr->size = vLen; (void)memcpy(pHdr->data, pVal, vLen); + tqInfo("vgId:%d, vnode tq snapshot read data, vLen:%d", TD_VID(pReader->pTq->pVnode), vLen); -_exit: +end: + if (code != 0) { + tqError("%s failed at %d, vnode tq snapshot read data failed since %s", __func__, lino, tstrerror(code)); + } tdbFree(pKey); tdbFree(pVal); - tqInfo("vgId:%d, vnode snapshot tq read data, vLen:%d", TD_VID(pReader->pTq->pVnode), vLen); - return code; - -_err: - tdbFree(pKey); - tdbFree(pVal); - tqError("vgId:%d, vnode snapshot tq read data failed since %s", TD_VID(pReader->pTq->pVnode), tstrerror(code)); return code; } @@ -135,135 +118,148 @@ struct STqSnapWriter { }; int32_t tqSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, STqSnapWriter** ppWriter) { - if (pTq == NULL || ppWriter == NULL) { - return TSDB_CODE_INVALID_MSG; - } - int32_t code = 0; + int code = TSDB_CODE_SUCCESS; + int32_t lino = 0; STqSnapWriter* pWriter = NULL; + TSDB_CHECK_NULL(pTq, code, lino, end, TSDB_CODE_INVALID_MSG); + TSDB_CHECK_NULL(ppWriter, code, lino, end, TSDB_CODE_INVALID_MSG); + // alloc pWriter = (STqSnapWriter*)taosMemoryCalloc(1, sizeof(*pWriter)); - if (pWriter == NULL) { - code = TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY); - ; - goto _err; - } + TSDB_CHECK_NULL(pWriter, code, lino, end, terrno); pWriter->pTq = pTq; pWriter->sver = sver; pWriter->ever = ever; code = tdbBegin(pTq->pMetaDB, &pWriter->txn, tdbDefaultMalloc, tdbDefaultFree, NULL, 0); - if (code < 0) { - taosMemoryFree(pWriter); - goto _err; - } - + TSDB_CHECK_CODE(code, lino, end); + tqInfo("vgId:%d, tq snapshot writer opene success", TD_VID(pTq->pVnode)); *ppWriter = pWriter; - return code; -_err: - tqError("vgId:%d, tq snapshot writer open failed since %s", TD_VID(pTq->pVnode), tstrerror(code)); - *ppWriter = NULL; +end: + if (code != 0){ + tqError("%s failed at %d tq snapshot writer open failed since %s", __func__, lino, tstrerror(code)); + taosMemoryFreeClear(pWriter); + } return code; } int32_t tqSnapWriterClose(STqSnapWriter** ppWriter, int8_t rollback) { - if (ppWriter == NULL || *ppWriter == NULL) { - return TSDB_CODE_INVALID_MSG; - } - int32_t code = 0; - STqSnapWriter* pWriter = *ppWriter; - STQ* pTq = pWriter->pTq; + int code = TSDB_CODE_SUCCESS; + int32_t lino = 0; + STqSnapWriter* pWriter = NULL; + + TSDB_CHECK_NULL(ppWriter, code, lino, end, TSDB_CODE_INVALID_MSG); + TSDB_CHECK_NULL(*ppWriter, code, lino, end, TSDB_CODE_INVALID_MSG); + pWriter = *ppWriter; if (rollback) { tdbAbort(pWriter->pTq->pMetaDB, pWriter->txn); } else { code = tdbCommit(pWriter->pTq->pMetaDB, pWriter->txn); - if (code) goto _err; + TSDB_CHECK_CODE(code, lino, end); + code = tdbPostCommit(pWriter->pTq->pMetaDB, pWriter->txn); - if (code) goto _err; + TSDB_CHECK_CODE(code, lino, end); } + tqInfo("vgId:%d, tq snapshot writer close success", TD_VID(pWriter->pTq->pVnode)); + taosMemoryFreeClear(*ppWriter); - taosMemoryFree(pWriter); - *ppWriter = NULL; - - return code; - -_err: - tqError("vgId:%d, tq snapshot writer close failed since %s", TD_VID(pTq->pVnode), tstrerror(code)); +end: + if (code != 0){ + tqError("%s failed at %d, tq snapshot writer close failed since %s", __func__, lino, tstrerror(code)); + } return code; } -int32_t tqSnapHandleWrite(STqSnapWriter* pWriter, uint8_t* pData, uint32_t nData) { - if (pWriter == NULL || pData == NULL || nData < sizeof(SSnapDataHdr)) { - return TSDB_CODE_INVALID_MSG; +static int32_t tqWriteCheck(STqSnapWriter* pWriter, uint8_t* pData, uint32_t nData){ + int code = TSDB_CODE_SUCCESS; + int32_t lino = 0; + TSDB_CHECK_NULL(pWriter, code, lino, end, TSDB_CODE_INVALID_MSG); + TSDB_CHECK_NULL(pData, code, lino, end, TSDB_CODE_INVALID_MSG); + TSDB_CHECK_CONDITION(nData >= sizeof(SSnapDataHdr), code, lino, end, TSDB_CODE_INVALID_MSG); +end: + if (code != 0){ + tqError("%s failed at %d failed since %s", __func__, lino, tstrerror(code)); } - int32_t code = 0; - STQ* pTq = pWriter->pTq; + return code; +} +int32_t tqSnapHandleWrite(STqSnapWriter* pWriter, uint8_t* pData, uint32_t nData) { + int code = TSDB_CODE_SUCCESS; + int32_t lino = 0; SDecoder decoder = {0}; SDecoder* pDecoder = &decoder; STqHandle handle = {0}; + code = tqWriteCheck(pWriter, pData, nData); + TSDB_CHECK_CODE(code, lino, end); + STQ* pTq = pWriter->pTq; tDecoderInit(pDecoder, pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr)); code = tDecodeSTqHandle(pDecoder, &handle); - if (code) goto end; + TSDB_CHECK_CODE(code, lino, end); + taosWLockLatch(&pTq->lock); code = tqMetaSaveInfo(pTq, pTq->pExecStore, handle.subKey, strlen(handle.subKey), pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr)); taosWUnLockLatch(&pTq->lock); + TSDB_CHECK_CODE(code, lino, end); + tqInfo("vgId:%d, vnode tq snapshot write success", TD_VID(pTq->pVnode)); end: tDecoderClear(pDecoder); tqDestroyTqHandle(&handle); - tqInfo("vgId:%d, vnode snapshot tq write result:%d", TD_VID(pTq->pVnode), code); + if (code != 0){ + tqError("%s failed at %d, vnode tq snapshot write failed since %s", __func__, lino, tstrerror(code)); + } return code; } int32_t tqSnapCheckInfoWrite(STqSnapWriter* pWriter, uint8_t* pData, uint32_t nData) { - if (pWriter == NULL || pData == NULL || nData < sizeof(SSnapDataHdr)) { - return TSDB_CODE_INVALID_MSG; - } - int32_t code = 0; + int code = TSDB_CODE_SUCCESS; + int32_t lino = 0; + + code = tqWriteCheck(pWriter, pData, nData); + TSDB_CHECK_CODE(code, lino, end); + STQ* pTq = pWriter->pTq; STqCheckInfo info = {0}; code = tqMetaDecodeCheckInfo(&info, pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr)); - if (code != 0) { - goto _err; - } + TSDB_CHECK_CODE(code, lino, end); code = tqMetaSaveInfo(pTq, pTq->pCheckStore, &info.topic, strlen(info.topic), pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr)); tDeleteSTqCheckInfo(&info); - if (code) goto _err; + TSDB_CHECK_CODE(code, lino, end); + tqInfo("vgId:%d, vnode tq check info write success", TD_VID(pTq->pVnode)); - return code; - -_err: - tqError("vgId:%d, vnode check info tq write failed since %s", TD_VID(pTq->pVnode), tstrerror(code)); +end: + if (code != 0){ + tqError("%s failed at %d, vnode tq check info write failed since %s", __func__, lino, tstrerror(code)); + } return code; } int32_t tqSnapOffsetWrite(STqSnapWriter* pWriter, uint8_t* pData, uint32_t nData) { - if (pWriter == NULL || pData == NULL || nData < sizeof(SSnapDataHdr)) { - return TSDB_CODE_INVALID_MSG; - } - int32_t code = 0; - STQ* pTq = pWriter->pTq; + int code = TSDB_CODE_SUCCESS; + int32_t lino = 0; + code = tqWriteCheck(pWriter, pData, nData); + TSDB_CHECK_CODE(code, lino, end); + STQ* pTq = pWriter->pTq; STqOffset info = {0}; code = tqMetaDecodeOffsetInfo(&info, pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr)); - if (code != 0) { - goto _err; - } + TSDB_CHECK_CODE(code, lino, end); code = tqMetaSaveInfo(pTq, pTq->pOffsetStore, info.subKey, strlen(info.subKey), pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr)); tDeleteSTqOffset(&info); - if (code) goto _err; + TSDB_CHECK_CODE(code, lino, end); + tqInfo("vgId:%d, vnode tq offset write success", TD_VID(pTq->pVnode)); - return code; - -_err: - tqError("vgId:%d, vnode check info tq write failed since %s", TD_VID(pTq->pVnode), tstrerror(code)); +end: + if (code != 0){ + tqError("%s failed at %d, vnode tq offset write failed since %s", __func__, lino, tstrerror(code)); + } return code; } diff --git a/source/libs/parser/src/parInsertSml.c b/source/libs/parser/src/parInsertSml.c index 22d1f7edda..d56cf7916f 100644 --- a/source/libs/parser/src/parInsertSml.c +++ b/source/libs/parser/src/parInsertSml.c @@ -20,40 +20,32 @@ int32_t qCreateSName(SName* pName, const char* pTableName, int32_t acctId, char* dbName, char* msgBuf, int32_t msgBufLen) { - SMsgBuf msg = {.buf = msgBuf, .len = msgBufLen}; - SToken sToken; - int32_t code = 0; - char* tbName = NULL; + SMsgBuf msg = {.buf = msgBuf, .len = msgBufLen}; + SToken sToken = {0}; + int code = TSDB_CODE_SUCCESS; + int32_t lino = 0; NEXT_TOKEN(pTableName, sToken); - - if (sToken.n == 0) { - return buildInvalidOperationMsg(&msg, "empty table name"); - } - + TSDB_CHECK_CONDITION(sToken.n != 0, code, lino, end, TSDB_CODE_TSC_INVALID_OPERATION); code = insCreateSName(pName, &sToken, acctId, dbName, &msg); - if (code) { - return code; - } - + TSDB_CHECK_CODE(code, lino, end); NEXT_TOKEN(pTableName, sToken); + TSDB_CHECK_CONDITION(sToken.n <= 0, code, lino, end, TSDB_CODE_TSC_INVALID_OPERATION); - if (sToken.n > 0) { - return buildInvalidOperationMsg(&msg, "table name format is wrong"); +end: + if (code != 0){ + uError("%s failed at %d since %s", __func__, lino, tstrerror(code)); } - return TSDB_CODE_SUCCESS; } static int32_t smlBoundColumnData(SArray* cols, SBoundColInfo* pBoundInfo, SSchema* pSchema, bool isTag) { + int code = TSDB_CODE_SUCCESS; + int32_t lino = 0; bool* pUseCols = taosMemoryCalloc(pBoundInfo->numOfCols, sizeof(bool)); - if (NULL == pUseCols) { - return terrno; - } - + TSDB_CHECK_NULL(pUseCols, code, lino, end, terrno); pBoundInfo->numOfBound = 0; int16_t lastColIdx = -1; // last column found - int32_t code = TSDB_CODE_SUCCESS; for (int i = 0; i < taosArrayGetSize(cols); ++i) { SSmlKv* kv = taosArrayGet(cols, i); @@ -65,16 +57,9 @@ static int32_t smlBoundColumnData(SArray* cols, SBoundColInfo* pBoundInfo, SSche index = insFindCol(&sToken, 0, t, pSchema); } - if (index < 0) { - uError("smlBoundColumnData. index:%d", index); - code = TSDB_CODE_SML_INVALID_DATA; - goto end; - } - if (pUseCols[index]) { - uError("smlBoundColumnData. already set. index:%d", index); - code = TSDB_CODE_SML_INVALID_DATA; - goto end; - } + TSDB_CHECK_CONDITION(index >= 0, code, lino, end, TSDB_CODE_SML_INVALID_DATA); + TSDB_CHECK_CONDITION(!pUseCols[index], code, lino, end, TSDB_CODE_SML_INVALID_DATA); + lastColIdx = index; pUseCols[index] = true; pBoundInfo->pColIndex[pBoundInfo->numOfBound] = index; @@ -82,11 +67,30 @@ static int32_t smlBoundColumnData(SArray* cols, SBoundColInfo* pBoundInfo, SSche } end: + if (code != 0){ + uError("%s failed at %d since %s", __func__, lino, tstrerror(code)); + } taosMemoryFree(pUseCols); - return code; } +static int32_t smlMbsToUcs4(const char* mbs, size_t mbsLen, void** result, int32_t* resultLen, int32_t maxLen, void* charsetCxt){ + int code = TSDB_CODE_SUCCESS; + void* pUcs4 = NULL; + int32_t lino = 0; + pUcs4 = taosMemoryCalloc(1, maxLen); + TSDB_CHECK_NULL(pUcs4, code, lino, end, terrno); + TSDB_CHECK_CONDITION(taosMbsToUcs4(mbs, mbsLen, (TdUcs4*)pUcs4, maxLen, resultLen, charsetCxt), code, lino, end, terrno); + *result = pUcs4; + pUcs4 = NULL; + +end: + if (code != 0){ + uError("%s failed at %d since %s", __func__, lino, tstrerror(code)); + } + taosMemoryFree(pUcs4); + return code; +} /** * @brief No json tag for schemaless * @@ -99,75 +103,39 @@ end: */ static int32_t smlBuildTagRow(SArray* cols, SBoundColInfo* tags, SSchema* pSchema, STag** ppTag, SArray** tagName, SMsgBuf* msg, void* charsetCxt) { - SArray* pTagArray = taosArrayInit(tags->numOfBound, sizeof(STagVal)); - if (!pTagArray) { - return terrno; - } + int code = TSDB_CODE_SUCCESS; + int32_t lino = 0; + SArray* pTagArray = taosArrayInit(tags->numOfBound, sizeof(STagVal)); + TSDB_CHECK_NULL(pTagArray, code, lino, end, terrno); *tagName = taosArrayInit(8, TSDB_COL_NAME_LEN); - if (!*tagName) { - return terrno; - } + TSDB_CHECK_NULL(*tagName, code, lino, end, terrno); - int32_t code = TSDB_CODE_SUCCESS; for (int i = 0; i < tags->numOfBound; ++i) { SSchema* pTagSchema = &pSchema[tags->pColIndex[i]]; SSmlKv* kv = taosArrayGet(cols, i); - if (kv == NULL){ - code = terrno; - uError("SML smlBuildTagRow error kv is null"); - goto end; - } - if (kv->keyLen != strlen(pTagSchema->name) || memcmp(kv->key, pTagSchema->name, kv->keyLen) != 0 || - kv->type != pTagSchema->type) { - code = TSDB_CODE_SML_INVALID_DATA; - uError("SML smlBuildTagRow error col not same %s", pTagSchema->name); - goto end; - } - - if (taosArrayPush(*tagName, pTagSchema->name) == NULL){ - code = terrno; - uError("SML smlBuildTagRow error push tag name"); - goto end; - } + TSDB_CHECK_NULL(kv, code, lino, end, terrno); + bool cond = (kv->keyLen == strlen(pTagSchema->name) && memcmp(kv->key, pTagSchema->name, kv->keyLen) == 0 && kv->type == pTagSchema->type); + TSDB_CHECK_CONDITION(cond, code, lino, end, TSDB_CODE_SML_INVALID_DATA); + TSDB_CHECK_NULL(taosArrayPush(*tagName, pTagSchema->name), code, lino, end, terrno); STagVal val = {.cid = pTagSchema->colId, .type = pTagSchema->type}; - // strcpy(val.colName, pTagSchema->name); if (pTagSchema->type == TSDB_DATA_TYPE_BINARY || pTagSchema->type == TSDB_DATA_TYPE_VARBINARY || pTagSchema->type == TSDB_DATA_TYPE_GEOMETRY) { val.pData = (uint8_t*)kv->value; val.nData = kv->length; } else if (pTagSchema->type == TSDB_DATA_TYPE_NCHAR) { - int32_t output = 0; - void* p = taosMemoryCalloc(1, kv->length * TSDB_NCHAR_SIZE); - if (p == NULL) { - code = terrno; - goto end; - } - if (!taosMbsToUcs4(kv->value, kv->length, (TdUcs4*)(p), kv->length * TSDB_NCHAR_SIZE, &output, charsetCxt)) { - if (terrno == TAOS_SYSTEM_ERROR(E2BIG)) { - taosMemoryFree(p); - code = generateSyntaxErrMsg(msg, TSDB_CODE_PAR_VALUE_TOO_LONG, pTagSchema->name); - goto end; - } - char buf[512] = {0}; - (void)snprintf(buf, tListLen(buf), " taosMbsToUcs4 error:%s", strerror(terrno)); - taosMemoryFree(p); - code = buildSyntaxErrMsg(msg, buf, kv->value); - goto end; - } - val.pData = p; - val.nData = output; + code = smlMbsToUcs4(kv->value, kv->length, (void**)&val.pData, &val.nData, kv->length * TSDB_NCHAR_SIZE, charsetCxt); + TSDB_CHECK_CODE(code, lino, end); } else { (void)memcpy(&val.i64, &(kv->value), kv->length); } - if (taosArrayPush(pTagArray, &val) == NULL){ - code = terrno; - uError("SML smlBuildTagRow error push tag array"); - goto end; - } + TSDB_CHECK_NULL(taosArrayPush(pTagArray, &val), code, lino, end, terrno); } - code = tTagNew(pTagArray, 1, false, ppTag); + end: + if (code != 0){ + uError("%s failed at %d since %s", __func__, lino, tstrerror(code)); + } for (int i = 0; i < taosArrayGetSize(pTagArray); ++i) { STagVal* p = (STagVal*)taosArrayGet(pTagArray, i); if (p->type == TSDB_DATA_TYPE_NCHAR) { @@ -179,18 +147,20 @@ end: } int32_t smlInitTableDataCtx(SQuery* query, STableMeta* pTableMeta, STableDataCxt** cxt) { + int ret = TSDB_CODE_SUCCESS; + int32_t lino = 0; SVCreateTbReq* pCreateTbReq = NULL; - int ret = insGetTableDataCxt(((SVnodeModifyOpStmt*)(query->pRoot))->pTableBlockHashObj, &pTableMeta->uid, + ret = insGetTableDataCxt(((SVnodeModifyOpStmt*)(query->pRoot))->pTableBlockHashObj, &pTableMeta->uid, sizeof(pTableMeta->uid), pTableMeta, &pCreateTbReq, cxt, false, false); - if (ret != TSDB_CODE_SUCCESS) { - return ret; - } - + TSDB_CHECK_CODE(ret, lino, end); ret = initTableColSubmitData(*cxt); - if (ret != TSDB_CODE_SUCCESS) { - return ret; + TSDB_CHECK_CODE(ret, lino, end); + +end: + if (ret != 0){ + uError("%s failed at %d since %s", __func__, lino, tstrerror(ret)); } - return TSDB_CODE_SUCCESS; + return ret; } void clearColValArraySml(SArray* pCols) { @@ -207,78 +177,51 @@ void clearColValArraySml(SArray* pCols) { } int32_t smlBuildRow(STableDataCxt* pTableCxt) { - SRow** pRow = taosArrayReserve(pTableCxt->pData->aRowP, 1); - if (pRow == NULL){ - return terrno; - } - int ret = tRowBuild(pTableCxt->pValues, pTableCxt->pSchema, pRow); - if (TSDB_CODE_SUCCESS != ret) { - return ret; - } + int ret = TSDB_CODE_SUCCESS; + int32_t lino = 0; + SRow** pRow = taosArrayReserve(pTableCxt->pData->aRowP, 1); + TSDB_CHECK_NULL(pRow, ret, lino, end, terrno); + ret = tRowBuild(pTableCxt->pValues, pTableCxt->pSchema, pRow); + TSDB_CHECK_CODE(ret, lino, end); SRowKey key; tRowGetKey(*pRow, &key); insCheckTableDataOrder(pTableCxt, &key); - return TSDB_CODE_SUCCESS; +end: + if (ret != 0){ + uError("%s failed at %d since %s", __func__, lino, tstrerror(ret)); + } + return ret; } int32_t smlBuildCol(STableDataCxt* pTableCxt, SSchema* schema, void* data, int32_t index, void* charsetCxt) { int ret = TSDB_CODE_SUCCESS; + int32_t lino = 0; SSchema* pColSchema = schema + index; SColVal* pVal = taosArrayGet(pTableCxt->pValues, index); - if (pVal == NULL) { - return TSDB_CODE_SUCCESS; - } + TSDB_CHECK_NULL(pVal, ret, lino, end, TSDB_CODE_SUCCESS); SSmlKv* kv = (SSmlKv*)data; if (kv->keyLen != strlen(pColSchema->name) || memcmp(kv->key, pColSchema->name, kv->keyLen) != 0 || kv->type != pColSchema->type) { ret = TSDB_CODE_SML_INVALID_DATA; char* tmp = taosMemoryCalloc(kv->keyLen + 1, 1); - if (tmp) { - (void)memcpy(tmp, kv->key, kv->keyLen); - uInfo("SML data(name:%s type:%s) is not same like the db data(name:%s type:%s)", tmp, tDataTypes[kv->type].name, - pColSchema->name, tDataTypes[pColSchema->type].name); - taosMemoryFree(tmp); - } else { - uError("SML smlBuildCol out of memory"); - ret = terrno; - } + TSDB_CHECK_NULL(tmp, ret, lino, end, terrno); + (void)memcpy(tmp, kv->key, kv->keyLen); + uInfo("SML data(name:%s type:%s) is not same like the db data(name:%s type:%s)", tmp, tDataTypes[kv->type].name, + pColSchema->name, tDataTypes[pColSchema->type].name); + taosMemoryFree(tmp); goto end; } if (kv->type == TSDB_DATA_TYPE_NCHAR) { - int32_t len = 0; - int64_t size = pColSchema->bytes - VARSTR_HEADER_SIZE; - if (size <= 0) { - ret = TSDB_CODE_SML_INVALID_DATA; - goto end; - } - char* pUcs4 = taosMemoryCalloc(1, size); - if (NULL == pUcs4) { - ret = terrno; - goto end; - } - if (!taosMbsToUcs4(kv->value, kv->length, (TdUcs4*)pUcs4, size, &len, charsetCxt)) { - if (terrno == TAOS_SYSTEM_ERROR(E2BIG)) { - taosMemoryFree(pUcs4); - ret = TSDB_CODE_PAR_VALUE_TOO_LONG; - goto end; - } - taosMemoryFree(pUcs4); - ret = TSDB_CODE_TSC_INVALID_VALUE; - goto end; - } - pVal->value.pData = pUcs4; - pVal->value.nData = len; + ret = smlMbsToUcs4(kv->value, kv->length, (void**)&pVal->value.pData, &pVal->value.nData, pColSchema->bytes - VARSTR_HEADER_SIZE, charsetCxt); + TSDB_CHECK_CODE(ret, lino, end); } else if (kv->type == TSDB_DATA_TYPE_BINARY) { pVal->value.nData = kv->length; pVal->value.pData = (uint8_t*)kv->value; } else if (kv->type == TSDB_DATA_TYPE_GEOMETRY || kv->type == TSDB_DATA_TYPE_VARBINARY) { pVal->value.nData = kv->length; pVal->value.pData = taosMemoryMalloc(kv->length); - if (!pVal->value.pData) { - ret = terrno; - uError("SML smlBuildCol malloc failed %s:%d, err: %s", __func__, __LINE__, tstrerror(ret)); - goto end; - } + TSDB_CHECK_NULL(pVal->value.pData, ret, lino, end, terrno); + (void)memcpy(pVal->value.pData, (uint8_t*)kv->value, kv->length); } else { (void)memcpy(&pVal->value.val, &(kv->value), kv->length); @@ -286,12 +229,17 @@ int32_t smlBuildCol(STableDataCxt* pTableCxt, SSchema* schema, void* data, int32 pVal->flag = CV_FLAG_VALUE; end: + if (ret != 0){ + uError("%s failed at %d since %s", __func__, lino, tstrerror(ret)); + } return ret; } int32_t smlBindData(SQuery* query, bool dataFormat, SArray* tags, SArray* colsSchema, SArray* cols, STableMeta* pTableMeta, char* tableName, const char* sTableName, int32_t sTableNameLen, int32_t ttl, char* msgBuf, int32_t msgBufLen, void* charsetCxt) { + int32_t lino = 0; + int32_t ret = 0; SMsgBuf pBuf = {.buf = msgBuf, .len = msgBufLen}; SSchema* pTagsSchema = getTableTagSchema(pTableMeta); @@ -299,50 +247,32 @@ int32_t smlBindData(SQuery* query, bool dataFormat, SArray* tags, SArray* colsSc SVCreateTbReq* pCreateTblReq = NULL; SArray* tagName = NULL; - int ret = insInitBoundColsInfo(getNumOfTags(pTableMeta), &bindTags); - if (ret != TSDB_CODE_SUCCESS) { - ret = buildInvalidOperationMsg(&pBuf, "init bound cols error"); - goto end; - } + ret = insInitBoundColsInfo(getNumOfTags(pTableMeta), &bindTags); + TSDB_CHECK_CODE(ret, lino, end); ret = smlBoundColumnData(tags, &bindTags, pTagsSchema, true); - if (ret != TSDB_CODE_SUCCESS) { - ret = buildInvalidOperationMsg(&pBuf, "bound tags error"); - goto end; - } + TSDB_CHECK_CODE(ret, lino, end); STag* pTag = NULL; - ret = smlBuildTagRow(tags, &bindTags, pTagsSchema, &pTag, &tagName, &pBuf, charsetCxt); - if (ret != TSDB_CODE_SUCCESS) { - goto end; - } + TSDB_CHECK_CODE(ret, lino, end); pCreateTblReq = taosMemoryCalloc(1, sizeof(SVCreateTbReq)); - if (NULL == pCreateTblReq) { - ret = terrno; - goto end; - } + TSDB_CHECK_NULL(pCreateTblReq, ret, lino, end, terrno); + ret = insBuildCreateTbReq(pCreateTblReq, tableName, pTag, pTableMeta->suid, NULL, tagName, pTableMeta->tableInfo.numOfTags, ttl); - if (TSDB_CODE_SUCCESS != ret) { - goto end; - } + TSDB_CHECK_CODE(ret, lino, end); pCreateTblReq->ctb.stbName = taosMemoryCalloc(1, sTableNameLen + 1); - if (pCreateTblReq->ctb.stbName == NULL){ - ret = terrno; - goto end; - } + TSDB_CHECK_NULL(pCreateTblReq->ctb.stbName, ret, lino, end, terrno); + (void)memcpy(pCreateTblReq->ctb.stbName, sTableName, sTableNameLen); if (dataFormat) { STableDataCxt** pTableCxt = (STableDataCxt**)taosHashGet(((SVnodeModifyOpStmt*)(query->pRoot))->pTableBlockHashObj, &pTableMeta->uid, sizeof(pTableMeta->uid)); - if (NULL == pTableCxt) { - ret = buildInvalidOperationMsg(&pBuf, "dataformat true. get tableDataCtx error"); - goto end; - } + TSDB_CHECK_NULL(pTableCxt, ret, lino, end, TSDB_CODE_TSC_INVALID_OPERATION); (*pTableCxt)->pData->flags |= SUBMIT_REQ_AUTO_CREATE_TABLE; (*pTableCxt)->pData->pCreateTbReq = pCreateTblReq; (*pTableCxt)->pMeta->uid = pTableMeta->uid; @@ -354,86 +284,47 @@ int32_t smlBindData(SQuery* query, bool dataFormat, SArray* tags, SArray* colsSc STableDataCxt* pTableCxt = NULL; ret = insGetTableDataCxt(((SVnodeModifyOpStmt*)(query->pRoot))->pTableBlockHashObj, &pTableMeta->uid, sizeof(pTableMeta->uid), pTableMeta, &pCreateTblReq, &pTableCxt, false, false); - if (ret != TSDB_CODE_SUCCESS) { - ret = buildInvalidOperationMsg(&pBuf, "insGetTableDataCxt error"); - goto end; - } + TSDB_CHECK_CODE(ret, lino, end); SSchema* pSchema = getTableColumnSchema(pTableMeta); ret = smlBoundColumnData(colsSchema, &pTableCxt->boundColsInfo, pSchema, false); - if (ret != TSDB_CODE_SUCCESS) { - ret = buildInvalidOperationMsg(&pBuf, "bound cols error"); - goto end; - } + TSDB_CHECK_CODE(ret, lino, end); ret = initTableColSubmitData(pTableCxt); - if (ret != TSDB_CODE_SUCCESS) { - ret = buildInvalidOperationMsg(&pBuf, "initTableColSubmitData error"); - goto end; - } + TSDB_CHECK_CODE(ret, lino, end); int32_t rowNum = taosArrayGetSize(cols); - if (rowNum <= 0) { - ret = buildInvalidOperationMsg(&pBuf, "cols size <= 0"); - goto end; - } + TSDB_CHECK_CONDITION(rowNum > 0, ret, lino, end, TSDB_CODE_TSC_INVALID_OPERATION); for (int32_t r = 0; r < rowNum; ++r) { void* rowData = taosArrayGetP(cols, r); - if (rowData == NULL) { - ret = terrno; - goto end; - } + TSDB_CHECK_NULL(rowData, ret, lino, end, terrno); + // 1. set the parsed value from sql string for (int c = 0; c < pTableCxt->boundColsInfo.numOfBound; ++c) { SSchema* pColSchema = &pSchema[pTableCxt->boundColsInfo.pColIndex[c]]; SColVal* pVal = taosArrayGet(pTableCxt->pValues, pTableCxt->boundColsInfo.pColIndex[c]); - if (pVal == NULL) { - ret = terrno; - goto end; - } + TSDB_CHECK_NULL(pVal, ret, lino, end, terrno); void** p = taosHashGet(rowData, pColSchema->name, strlen(pColSchema->name)); if (p == NULL) { continue; } SSmlKv* kv = *(SSmlKv**)p; - if (kv->type != pColSchema->type) { - ret = buildInvalidOperationMsg(&pBuf, "kv type not equal to col type"); - goto end; - } + TSDB_CHECK_CONDITION(kv->type == pColSchema->type, ret, lino, end, TSDB_CODE_TSC_INVALID_OPERATION); + if (pColSchema->type == TSDB_DATA_TYPE_TIMESTAMP) { kv->i = convertTimePrecision(kv->i, TSDB_TIME_PRECISION_NANO, pTableMeta->tableInfo.precision); } if (kv->type == TSDB_DATA_TYPE_NCHAR) { - int32_t len = 0; - char* pUcs4 = taosMemoryCalloc(1, pColSchema->bytes - VARSTR_HEADER_SIZE); - if (NULL == pUcs4) { - ret = terrno; - goto end; - } - if (!taosMbsToUcs4(kv->value, kv->length, (TdUcs4*)pUcs4, pColSchema->bytes - VARSTR_HEADER_SIZE, &len, charsetCxt)) { - if (terrno == TAOS_SYSTEM_ERROR(E2BIG)) { - uError("sml bind taosMbsToUcs4 error, kv length:%d, bytes:%d, kv->value:%s", (int)kv->length, - pColSchema->bytes, kv->value); - (void)buildInvalidOperationMsg(&pBuf, "value too long"); - ret = TSDB_CODE_PAR_VALUE_TOO_LONG; - goto end; - } - ret = buildInvalidOperationMsg(&pBuf, strerror(terrno)); - goto end; - } - pVal->value.pData = pUcs4; - pVal->value.nData = len; + ret = smlMbsToUcs4(kv->value, kv->length, (void**)&pVal->value.pData, &pVal->value.nData, pColSchema->bytes - VARSTR_HEADER_SIZE, charsetCxt); + TSDB_CHECK_CODE(ret, lino, end); } else if (kv->type == TSDB_DATA_TYPE_BINARY) { pVal->value.nData = kv->length; pVal->value.pData = (uint8_t*)kv->value; } else if (kv->type == TSDB_DATA_TYPE_GEOMETRY || kv->type == TSDB_DATA_TYPE_VARBINARY) { pVal->value.nData = kv->length; pVal->value.pData = taosMemoryMalloc(kv->length); - if (NULL == pVal->value.pData) { - ret = terrno; - goto end; - } + TSDB_CHECK_NULL(pVal->value.pData, ret, lino, end, terrno); (void)memcpy(pVal->value.pData, (uint8_t*)kv->value, kv->length); } else { (void)memcpy(&pVal->value.val, &(kv->value), kv->length); @@ -442,22 +333,20 @@ int32_t smlBindData(SQuery* query, bool dataFormat, SArray* tags, SArray* colsSc } SRow** pRow = taosArrayReserve(pTableCxt->pData->aRowP, 1); - if (NULL == pRow) { - ret = terrno; - goto end; - } + TSDB_CHECK_NULL(pRow, ret, lino, end, terrno); ret = tRowBuild(pTableCxt->pValues, pTableCxt->pSchema, pRow); - if (TSDB_CODE_SUCCESS != ret) { - ret = buildInvalidOperationMsg(&pBuf, "tRowBuild error"); - goto end; - } - SRowKey key; + TSDB_CHECK_CODE(ret, lino, end); + SRowKey key = {0}; tRowGetKey(*pRow, &key); insCheckTableDataOrder(pTableCxt, &key); clearColValArraySml(pTableCxt->pValues); } end: + if (ret != 0){ + uError("%s failed at %d since %s", __func__, lino, tstrerror(ret)); + buildInvalidOperationMsg(&pBuf, tstrerror(ret)); + } insDestroyBoundColInfo(&bindTags); tdDestroySVCreateTbReq(pCreateTblReq); taosMemoryFree(pCreateTblReq); @@ -466,29 +355,21 @@ end: } int32_t smlInitHandle(SQuery** query) { + int32_t lino = 0; + int32_t code = 0; *query = NULL; SQuery* pQuery = NULL; SVnodeModifyOpStmt* stmt = NULL; - int32_t code = nodesMakeNode(QUERY_NODE_QUERY, (SNode**)&pQuery); - if (code != 0) { - uError("SML create pQuery error"); - goto END; - } + code = nodesMakeNode(QUERY_NODE_QUERY, (SNode**)&pQuery); + TSDB_CHECK_CODE(code, lino, end); pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE; pQuery->haveResultSet = false; pQuery->msgType = TDMT_VND_SUBMIT; code = nodesMakeNode(QUERY_NODE_VNODE_MODIFY_STMT, (SNode**)&stmt); - if (code != 0) { - uError("SML create SVnodeModifyOpStmt error"); - goto END; - } + TSDB_CHECK_CODE(code, lino, end); stmt->pTableBlockHashObj = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK); - if (stmt->pTableBlockHashObj == NULL){ - uError("SML create pTableBlockHashObj error"); - code = terrno; - goto END; - } + TSDB_CHECK_NULL(stmt->pTableBlockHashObj, code, lino, end, terrno); stmt->freeHashFunc = insDestroyTableDataCxtHashMap; stmt->freeArrayFunc = insDestroyVgroupDataCxtList; @@ -496,24 +377,28 @@ int32_t smlInitHandle(SQuery** query) { *query = pQuery; return code; -END: +end: + if (code != 0) { + uError("%s failed at %d since %s", __func__, lino, tstrerror(code)); + } nodesDestroyNode((SNode*)stmt); qDestroyQuery(pQuery); return code; } int32_t smlBuildOutput(SQuery* handle, SHashObj* pVgHash) { + int32_t lino = 0; + int32_t code = 0; + SVnodeModifyOpStmt* pStmt = (SVnodeModifyOpStmt*)(handle)->pRoot; - // merge according to vgId - int32_t code = insMergeTableDataCxt(pStmt->pTableBlockHashObj, &pStmt->pVgDataBlocks, true); - if (code != TSDB_CODE_SUCCESS) { - uError("insMergeTableDataCxt failed"); - return code; - } + code = insMergeTableDataCxt(pStmt->pTableBlockHashObj, &pStmt->pVgDataBlocks, true); + TSDB_CHECK_CODE(code, lino, end); code = insBuildVgDataBlocks(pVgHash, pStmt->pVgDataBlocks, &pStmt->pDataBlocks, false); - if (code != TSDB_CODE_SUCCESS) { - uError("insBuildVgDataBlocks failed"); - return code; + TSDB_CHECK_CODE(code, lino, end); + +end: + if (code != 0) { + uError("%s failed at %d since %s", __func__, lino, tstrerror(code)); } return code; } diff --git a/source/os/src/osString.c b/source/os/src/osString.c index 380e9f84d3..1d07b64c70 100644 --- a/source/os/src/osString.c +++ b/source/os/src/osString.c @@ -405,8 +405,6 @@ bool taosMbsToUcs4(const char *mbs, size_t mbsLength, TdUcs4 *ucs4, int32_t ucs4 size_t ucs4_input_len = mbsLength; size_t outLeft = ucs4_max_len; if (iconv(conv, (char **)&mbs, &ucs4_input_len, (char **)&ucs4, &outLeft) == -1) { - char buf[512] = {0}; - snprintf(buf, tListLen(buf), " taosMbsToUcs4 error:%s %d %d", strerror(terrno), errno, EILSEQ); terrno = TAOS_SYSTEM_ERROR(errno); taosReleaseConv(idx, conv, M2C, charsetCxt); return false; From a1e20680f78a8bcf85975843f8052fbc329002ae Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Mon, 23 Dec 2024 15:57:49 +0800 Subject: [PATCH 2/8] fix:[TD-33272]add test case --- source/client/test/smlTest.cpp | 1 + source/common/src/msg/tmsg.c | 2 + source/dnode/vnode/src/tq/tq.c | 3 +- source/dnode/vnode/src/tq/tqOffset.c | 72 ++++++++++++----------- source/dnode/vnode/test/CMakeLists.txt | 38 ++++++------- source/dnode/vnode/test/tqTest.cpp | 79 ++++++++++++++++++++++++++ source/libs/parser/src/parInsertSml.c | 3 +- utils/test/c/sml_test.c | 25 ++++++++ 8 files changed, 166 insertions(+), 57 deletions(-) create mode 100644 source/dnode/vnode/test/tqTest.cpp diff --git a/source/client/test/smlTest.cpp b/source/client/test/smlTest.cpp index 338457bec4..968a4e7c75 100644 --- a/source/client/test/smlTest.cpp +++ b/source/client/test/smlTest.cpp @@ -276,6 +276,7 @@ TEST(testCase, smlParseCols_Test) { info->dataFormat = false; SSmlLineInfo elements = {0}; info->msgBuf = msgBuf; + ASSERT_EQ(smlInitHandle(NULL), const char *data = "st,t=1 cb\\=in=\"pass\\,it " diff --git a/source/common/src/msg/tmsg.c b/source/common/src/msg/tmsg.c index 2193c7983f..166c889947 100644 --- a/source/common/src/msg/tmsg.c +++ b/source/common/src/msg/tmsg.c @@ -11141,6 +11141,7 @@ void tOffsetCopy(STqOffsetVal *pLeft, const STqOffsetVal *pRight) { } void tOffsetDestroy(void *param) { + if (param == NULL) return; STqOffsetVal *pVal = (STqOffsetVal *)param; if (IS_VAR_DATA_TYPE(pVal->primaryKey.type)) { taosMemoryFreeClear(pVal->primaryKey.pData); @@ -11148,6 +11149,7 @@ void tOffsetDestroy(void *param) { } void tDeleteSTqOffset(void *param) { + if (param == NULL) return; STqOffset *pVal = (STqOffset *)param; tOffsetDestroy(&pVal->val); } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 03037e529b..37f3572f65 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -152,10 +152,9 @@ void tqClose(STQ* pTq) { taosMemoryFree(pTq->path); tqMetaClose(pTq); - int32_t vgId = pTq->pStreamMeta->vgId; streamMetaClose(pTq->pStreamMeta); - qDebug("vgId:%d end to close tq", vgId); + qDebug("vgId:%d end to close tq", pTq->pStreamMeta != NULL ? pTq->pStreamMeta->vgId : -1); taosMemoryFree(pTq); } diff --git a/source/dnode/vnode/src/tq/tqOffset.c b/source/dnode/vnode/src/tq/tqOffset.c index 4d90091701..57a901a2e1 100644 --- a/source/dnode/vnode/src/tq/tqOffset.c +++ b/source/dnode/vnode/src/tq/tqOffset.c @@ -17,33 +17,41 @@ #include "tq.h" int32_t tqBuildFName(char** data, const char* path, char* name) { - if (data == NULL || path == NULL || name == NULL) { - return TSDB_CODE_INVALID_MSG; - } + int32_t code = 0; + int32_t lino = 0; + char* fname = NULL; + TSDB_CHECK_NULL(data, code, lino, END, TSDB_CODE_INVALID_MSG); + TSDB_CHECK_NULL(path, code, lino, END, TSDB_CODE_INVALID_MSG); + TSDB_CHECK_NULL(name, code, lino, END, TSDB_CODE_INVALID_MSG); int32_t len = strlen(path) + strlen(name) + 2; - char* fname = taosMemoryCalloc(1, len); - if(fname == NULL) { - return terrno; - } - int32_t code = tsnprintf(fname, len, "%s%s%s", path, TD_DIRSEP, name); - if (code < 0){ - code = TAOS_SYSTEM_ERROR(errno); - taosMemoryFree(fname); - return code; - } + fname = taosMemoryCalloc(1, len); + TSDB_CHECK_NULL(fname, code, lino, END, terrno); + code = tsnprintf(fname, len, "%s%s%s", path, TD_DIRSEP, name); + TSDB_CHECK_CODE(code, lino, END); + *data = fname; - return TDB_CODE_SUCCESS; + fname = NULL; + +END: + if (code != 0){ + tqError("%s failed at %d since %s", __func__, lino, tstrerror(code)); + } + taosMemoryFree(fname); + return code; } int32_t tqOffsetRestoreFromFile(STQ* pTq, char* name) { - if (pTq == NULL || name == NULL) { - return TSDB_CODE_INVALID_MSG; - } - int32_t code = TDB_CODE_SUCCESS; - int32_t lino = 0; - void* pMemBuf = NULL; + int32_t code = TDB_CODE_SUCCESS; + int32_t lino = 0; + void* pMemBuf = NULL; + TdFilePtr pFile = NULL; + STqOffset *pOffset = NULL; + void *pIter = NULL; - TdFilePtr pFile = taosOpenFile(name, TD_FILE_READ); + TSDB_CHECK_NULL(pTq, code, lino, END, TSDB_CODE_INVALID_MSG); + TSDB_CHECK_NULL(name, code, lino, END, TSDB_CODE_INVALID_MSG); + + pFile = taosOpenFile(name, TD_FILE_READ); TSDB_CHECK_NULL(pFile, code, lino, END, TDB_CODE_SUCCESS); int64_t ret = 0; @@ -68,25 +76,20 @@ int32_t tqOffsetRestoreFromFile(STQ* pTq, char* name) { STqOffset offset = {0}; code = tqMetaDecodeOffsetInfo(&offset, pMemBuf, size); TSDB_CHECK_CODE(code, lino, END); - code = taosHashPut(pTq->pOffset, offset.subKey, strlen(offset.subKey), &offset, sizeof(STqOffset)); - if (code != TDB_CODE_SUCCESS) { - tDeleteSTqOffset(&offset); - goto END; - } + pOffset = &offset; + code = taosHashPut(pTq->pOffset, pOffset->subKey, strlen(pOffset->subKey), pOffset, sizeof(STqOffset)); + TSDB_CHECK_CODE(code, lino, END); + pOffset = NULL; tqInfo("tq: offset restore from file to tdb, size:%d, hash size:%d subkey:%s", total, taosHashGetSize(pTq->pOffset), offset.subKey); taosMemoryFree(pMemBuf); pMemBuf = NULL; } - void *pIter = NULL; while ((pIter = taosHashIterate(pTq->pOffset, pIter))) { - STqOffset* pOffset = (STqOffset*)pIter; - code = tqMetaSaveOffset(pTq, pOffset); - if(code != 0){ - taosHashCancelIterate(pTq->pOffset, pIter); - goto END; - } + STqOffset* offset = (STqOffset*)pIter; + code = tqMetaSaveOffset(pTq, offset); + TSDB_CHECK_CODE(code, lino, END); } END: @@ -96,5 +99,8 @@ END: taosCloseFile(&pFile); taosMemoryFree(pMemBuf); + tDeleteSTqOffset(pOffset); + taosHashCancelIterate(pTq->pOffset, pIter); + return code; } diff --git a/source/dnode/vnode/test/CMakeLists.txt b/source/dnode/vnode/test/CMakeLists.txt index 724eabc751..826296e99f 100644 --- a/source/dnode/vnode/test/CMakeLists.txt +++ b/source/dnode/vnode/test/CMakeLists.txt @@ -1,29 +1,25 @@ -MESSAGE(STATUS "vnode unit test") +MESSAGE(STATUS "tq unit test") # GoogleTest requires at least C++11 SET(CMAKE_CXX_STANDARD 11) -AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST) -# add_executable(tqTest "") -# target_sources(tqTest -# PRIVATE -# "tqMetaTest.cpp" -# ) -# target_include_directories(tqTest -# PUBLIC -# "${TD_SOURCE_DIR}/include/server/vnode/tq" -# "${CMAKE_CURRENT_SOURCE_DIR}/../inc" -# ) +add_executable(tqTest tqTest.cpp) +target_include_directories(tqTest + PUBLIC + "${CMAKE_CURRENT_SOURCE_DIR}/../inc" +) -# target_link_libraries(tqTest -# tq -# gtest_main -# ) -# enable_testing() -# add_test( -# NAME tq_test -# COMMAND tqTest -# ) +TARGET_LINK_LIBRARIES( + tqTest + PUBLIC os util common vnode gtest_main +) + +enable_testing() + +add_test( + NAME tq_test + COMMAND tqTest +) # ADD_EXECUTABLE(tsdbSmaTest tsdbSmaTest.cpp) # TARGET_LINK_LIBRARIES( diff --git a/source/dnode/vnode/test/tqTest.cpp b/source/dnode/vnode/test/tqTest.cpp new file mode 100644 index 0000000000..1b5fe4fdcd --- /dev/null +++ b/source/dnode/vnode/test/tqTest.cpp @@ -0,0 +1,79 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include +#include + +#include +#include +#include + +#include +#include + +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wwrite-strings" +#pragma GCC diagnostic ignored "-Wunused-function" +#pragma GCC diagnostic ignored "-Wunused-variable" +#pragma GCC diagnostic ignored "-Wsign-compare" + +SDmNotifyHandle dmNotifyHdl = {.state = 0}; + +#include "tq.h" +int main(int argc, char **argv) { + testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} + +STqOffset offset = {.subKey = "testtest", .val = {.type = TMQ_OFFSET__LOG, .version = 8923}}; + +void tqWriteOffset() { + TdFilePtr pFile = taosOpenFile(TQ_OFFSET_NAME, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND); + + int32_t bodyLen; + int32_t code; + tEncodeSize(tEncodeSTqOffset, &offset, bodyLen, code); + int32_t totLen = INT_BYTES + bodyLen; + void* buf = taosMemoryCalloc(1, totLen); + void* abuf = POINTER_SHIFT(buf, INT_BYTES); + + *(int32_t*)buf = htonl(bodyLen); + SEncoder encoder; + tEncoderInit(&encoder, (uint8_t*)abuf, bodyLen); + tEncodeSTqOffset(&encoder, &offset); + taosWriteFile(pFile, buf, totLen); + + taosMemoryFree(buf); + + taosCloseFile(&pFile); +} + +TEST(testCase, tqOffsetTest) { + STQ* pTq = (STQ*)taosMemoryCalloc(1, sizeof(STQ)); + pTq->path = taosStrdup("./"); + + pTq->pOffset = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, HASH_ENTRY_LOCK); + taosHashSetFreeFp(pTq->pOffset, (FDelete)tDeleteSTqOffset); + + tdbOpen(pTq->path, 16 * 1024, 1, &pTq->pMetaDB, 0, 0, NULL); + tdbTbOpen("tq.offset.db", -1, -1, NULL, pTq->pMetaDB, &pTq->pOffsetStore, 0); + + tqWriteOffset(); + tqOffsetRestoreFromFile(pTq, TQ_OFFSET_NAME); + taosRemoveFile(TQ_OFFSET_NAME); + tqClose(pTq); +} + +#pragma GCC diagnostic pop diff --git a/source/libs/parser/src/parInsertSml.c b/source/libs/parser/src/parInsertSml.c index d56cf7916f..bf86ef718e 100644 --- a/source/libs/parser/src/parInsertSml.c +++ b/source/libs/parser/src/parInsertSml.c @@ -357,10 +357,11 @@ end: int32_t smlInitHandle(SQuery** query) { int32_t lino = 0; int32_t code = 0; - *query = NULL; SQuery* pQuery = NULL; SVnodeModifyOpStmt* stmt = NULL; + TSDB_CHECK_NULL(query, code, lino, end, TSDB_CODE_INVALID_PARA); + *query = NULL; code = nodesMakeNode(QUERY_NODE_QUERY, (SNode**)&pQuery); TSDB_CHECK_CODE(code, lino, end); pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE; diff --git a/utils/test/c/sml_test.c b/utils/test/c/sml_test.c index d922a6454e..0907c2a641 100644 --- a/utils/test/c/sml_test.c +++ b/utils/test/c/sml_test.c @@ -1939,6 +1939,20 @@ int sml_td24559_Test() { } taos_free_result(pRes); + const char *sql2[] = { + "stb,t1=1 f1=283i32,f2=g\"Point(4.343 89.342)\" 1632299375000", + }; + + pRes = taos_query(taos, "use td24559"); + taos_free_result(pRes); + + pRes = taos_schemaless_insert(taos, (char **)sql2, sizeof(sql2) / sizeof(sql2[0]), TSDB_SML_LINE_PROTOCOL, + TSDB_SML_TIMESTAMP_MILLI_SECONDS); + + code = taos_errno(pRes); + printf("%s result0:%s\n", __FUNCTION__, taos_errstr(pRes)); + taos_free_result(pRes); + pRes = taos_query(taos, "drop database if exists td24559"); taos_free_result(pRes); @@ -2325,6 +2339,17 @@ int sml_td17324_Test() { ASSERT(code == 0); taos_free_result(pRes); + const char *sql1[] = { + "st123456,t1=3i64,t2=4f64,t3=\"t3\" c1=3i64,c3=L\"pa3ssit\",c2=false,c4=4f64 1732700000394000000", + }; + + pRes = taos_schemaless_insert(taos, (char **)sql1, sizeof(sql1) / sizeof(sql1[0]), TSDB_SML_LINE_PROTOCOL, + TSDB_SML_TIMESTAMP_NANO_SECONDS); + code = taos_errno(pRes); + printf("%s result0:%s\n", __FUNCTION__, taos_errstr(pRes)); + ASSERT(code == 0); + taos_free_result(pRes); + taos_close(taos); return code; From 4f77274af46e84514eeaeda017d3ac79371bde90 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Mon, 23 Dec 2024 16:15:44 +0800 Subject: [PATCH 3/8] fix:[TD-33272]add test case --- source/client/test/smlTest.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/client/test/smlTest.cpp b/source/client/test/smlTest.cpp index 968a4e7c75..cbd13c6749 100644 --- a/source/client/test/smlTest.cpp +++ b/source/client/test/smlTest.cpp @@ -276,7 +276,7 @@ TEST(testCase, smlParseCols_Test) { info->dataFormat = false; SSmlLineInfo elements = {0}; info->msgBuf = msgBuf; - ASSERT_EQ(smlInitHandle(NULL), + ASSERT_EQ(smlInitHandle(NULL), 0), const char *data = "st,t=1 cb\\=in=\"pass\\,it " From 196ef024c534bb50b5b02cfccb5ad071d38e3556 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Mon, 23 Dec 2024 16:52:14 +0800 Subject: [PATCH 4/8] fix:[TD-33272]add test case --- source/client/test/smlTest.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/client/test/smlTest.cpp b/source/client/test/smlTest.cpp index cbd13c6749..87be16e467 100644 --- a/source/client/test/smlTest.cpp +++ b/source/client/test/smlTest.cpp @@ -276,7 +276,7 @@ TEST(testCase, smlParseCols_Test) { info->dataFormat = false; SSmlLineInfo elements = {0}; info->msgBuf = msgBuf; - ASSERT_EQ(smlInitHandle(NULL), 0), + ASSERT_EQ(smlInitHandle(NULL), 0); const char *data = "st,t=1 cb\\=in=\"pass\\,it " From 6244712887e79015240bef5cf21f3398ae88d505 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Mon, 23 Dec 2024 18:12:47 +0800 Subject: [PATCH 5/8] fix:[TD-33272]add test case --- source/dnode/vnode/src/tq/tqOffset.c | 3 +- source/dnode/vnode/test/tqTest.cpp | 4 +- .../7-tmq/tmqVnodeSplit-ntb-select.py | 192 ++++++++++++++++++ 3 files changed, 195 insertions(+), 4 deletions(-) create mode 100644 tests/system-test/7-tmq/tmqVnodeSplit-ntb-select.py diff --git a/source/dnode/vnode/src/tq/tqOffset.c b/source/dnode/vnode/src/tq/tqOffset.c index 57a901a2e1..a131f30a04 100644 --- a/source/dnode/vnode/src/tq/tqOffset.c +++ b/source/dnode/vnode/src/tq/tqOffset.c @@ -26,8 +26,7 @@ int32_t tqBuildFName(char** data, const char* path, char* name) { int32_t len = strlen(path) + strlen(name) + 2; fname = taosMemoryCalloc(1, len); TSDB_CHECK_NULL(fname, code, lino, END, terrno); - code = tsnprintf(fname, len, "%s%s%s", path, TD_DIRSEP, name); - TSDB_CHECK_CODE(code, lino, END); + (void)tsnprintf(fname, len, "%s%s%s", path, TD_DIRSEP, name); *data = fname; fname = NULL; diff --git a/source/dnode/vnode/test/tqTest.cpp b/source/dnode/vnode/test/tqTest.cpp index 1b5fe4fdcd..d757aa6d43 100644 --- a/source/dnode/vnode/test/tqTest.cpp +++ b/source/dnode/vnode/test/tqTest.cpp @@ -37,11 +37,11 @@ int main(int argc, char **argv) { return RUN_ALL_TESTS(); } -STqOffset offset = {.subKey = "testtest", .val = {.type = TMQ_OFFSET__LOG, .version = 8923}}; - void tqWriteOffset() { TdFilePtr pFile = taosOpenFile(TQ_OFFSET_NAME, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND); + STqOffset offset = {.val = {.type = TMQ_OFFSET__LOG, .version = 8923}}; + strcpy(offset.subKey, "testtest"); int32_t bodyLen; int32_t code; tEncodeSize(tEncodeSTqOffset, &offset, bodyLen, code); diff --git a/tests/system-test/7-tmq/tmqVnodeSplit-ntb-select.py b/tests/system-test/7-tmq/tmqVnodeSplit-ntb-select.py new file mode 100644 index 0000000000..8f8eb7d4ea --- /dev/null +++ b/tests/system-test/7-tmq/tmqVnodeSplit-ntb-select.py @@ -0,0 +1,192 @@ + +import taos +import sys +import time +import socket +import os +import threading +import math + +from util.log import * +from util.sql import * +from util.cases import * +from util.dnodes import * +from util.common import * +from util.cluster import * +sys.path.append("./7-tmq") +from tmqCommon import * + +from util.cluster import * +sys.path.append("./6-cluster") +from clusterCommonCreate import * +from clusterCommonCheck import clusterComCheck + + + +class TDTestCase: + def __init__(self): + self.vgroups = 1 + self.ctbNum = 10 + self.rowsPerTbl = 1000 + + def init(self, conn, logSql, replicaVar=1): + self.replicaVar = int(replicaVar) + tdLog.debug(f"start to excute {__file__}") + tdSql.init(conn.cursor(), True) + + def getDataPath(self): + selfPath = tdCom.getBuildPath() + + return selfPath + '/../sim/dnode%d/data/vnode/vnode%d/wal/*'; + + def prepareTestEnv(self): + tdLog.printNoPrefix("======== prepare test env include database, stable, ctables, and insert data: ") + paraDict = {'dbName': 'dbt', + 'dropFlag': 1, + 'event': '', + 'vgroups': 1, + 'stbName': 'stb', + 'colPrefix': 'c', + 'tagPrefix': 't', + 'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}], + 'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}], + 'ctbPrefix': 'ctb', + 'ctbStartIdx': 0, + 'ctbNum': 10, + 'rowsPerTbl': 1000, + 'batchNum': 10, + 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 + 'pollDelay': 120, + 'showMsg': 1, + 'showRow': 1, + 'snapshot': 0} + + paraDict['vgroups'] = self.vgroups + paraDict['ctbNum'] = self.ctbNum + paraDict['rowsPerTbl'] = self.rowsPerTbl + + tdCom.drop_all_db() + tmqCom.initConsumerTable() + tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], wal_retention_period=36000,vgroups=paraDict["vgroups"],replica=self.replicaVar) + tdLog.info("create stb") + tdSql.query("create table dbt.t(ts timestamp, v int)") + tdSql.query("insert into dbt.t values('2022-01-01 00:00:00.000', 0)") + tdSql.query("insert into dbt.t values('2022-01-01 00:00:02.000', 0)") + tdSql.query("insert into dbt.t values('2022-01-01 00:00:03.000', 0)") + return + + def restartAndRemoveWal(self, deleteWal): + tdDnodes = cluster.dnodes + tdSql.query("select * from information_schema.ins_vnodes") + for result in tdSql.queryResult: + if result[2] == 'dbt': + tdLog.debug("dnode is %d"%(result[0])) + dnodeId = result[0] + vnodeId = result[1] + + tdDnodes[dnodeId - 1].stoptaosd() + time.sleep(1) + dataPath = self.getDataPath() + dataPath = dataPath%(dnodeId,vnodeId) + tdLog.debug("dataPath:%s"%dataPath) + if deleteWal: + if os.system('rm -rf ' + dataPath) != 0: + tdLog.exit("rm error") + + tdDnodes[dnodeId - 1].starttaosd() + time.sleep(1) + break + tdLog.debug("restart dnode ok") + + def splitVgroups(self): + tdSql.query("select * from information_schema.ins_vnodes") + vnodeId = 0 + for result in tdSql.queryResult: + if result[2] == 'dbt': + vnodeId = result[1] + tdLog.debug("vnode is %d"%(vnodeId)) + break + splitSql = "split vgroup %d" %(vnodeId) + tdLog.debug("splitSql:%s"%(splitSql)) + tdSql.query(splitSql) + tdLog.debug("splitSql ok") + + def tmqCase1(self, deleteWal=False): + tdLog.printNoPrefix("======== test case 1: ") + paraDict = {'dbName': 'dbt', + 'dropFlag': 1, + 'event': '', + 'vgroups': 1, + 'stbName': 'stb', + 'colPrefix': 'c', + 'tagPrefix': 't', + 'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}], + 'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}], + 'ctbPrefix': 'ctb1', + 'ctbStartIdx': 0, + 'ctbNum': 10, + 'rowsPerTbl': 1000, + 'batchNum': 10, + 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 + 'pollDelay': 2, + 'showMsg': 1, + 'showRow': 1, + 'snapshot': 0} + + paraDict['vgroups'] = self.vgroups + paraDict['ctbNum'] = self.ctbNum + paraDict['rowsPerTbl'] = self.rowsPerTbl + + topicNameList = ['topic1'] + # expectRowsList = [] + tmqCom.initConsumerTable() + + tdLog.info("create topics from ntb with filter") + queryString = "select * from %s.t"%(paraDict['dbName']) + sqlString = "create topic %s as %s" %(topicNameList[0], queryString) + tdLog.info("create topic sql: %s"%sqlString) + tdSql.execute(sqlString) + + # init consume info, and start tmq_sim, then check consume result + tdLog.info("insert consume info to consume processor") + consumerId = 0 + expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"] * 2 + topicList = topicNameList[0] + ifcheckdata = 1 + ifManualCommit = 1 + keyList = 'group.id:cgrp1, enable.auto.commit:true, auto.commit.interval.ms:200, auto.offset.reset:earliest' + tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) + + tdLog.info("start consume processor") + tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot']) + tdLog.info("wait the consume result") + + + tmqCom.getStartConsumeNotifyFromTmqsim() + tmqCom.getStartCommitNotifyFromTmqsim() + + #restart dnode & remove wal + self.restartAndRemoveWal(deleteWal) + + # split vgroup + self.splitVgroups() + + clusterComCheck.check_vgroups_status(vgroup_numbers=2,db_replica=self.replicaVar,db_name="dbt",count_number=240) + + time.sleep(3) + for i in range(len(topicNameList)): + tdSql.query("drop topic %s"%topicNameList[i]) + tdLog.printNoPrefix("======== test case 1 end ...... ") + + def run(self): + self.prepareTestEnv() + self.tmqCase1(True) + + def stop(self): + tdSql.close() + tdLog.success(f"{__file__} successfully executed") + +event = threading.Event() + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase()) From 4b62a9a5872ab49ed7a5afbf470101d90096bc95 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Mon, 23 Dec 2024 19:30:36 +0800 Subject: [PATCH 6/8] fix:[TD-33272]add test case --- source/dnode/vnode/src/tq/tqSnapshot.c | 2 +- tests/parallel_test/cases.task | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/tq/tqSnapshot.c b/source/dnode/vnode/src/tq/tqSnapshot.c index ddbbba57a0..ce5008e344 100644 --- a/source/dnode/vnode/src/tq/tqSnapshot.c +++ b/source/dnode/vnode/src/tq/tqSnapshot.c @@ -89,7 +89,7 @@ int32_t tqSnapRead(STqSnapReader* pReader, uint8_t** ppData) { TSDB_CHECK_NULL(ppData, code, lino, end, TSDB_CODE_INVALID_MSG); code = tdbTbcNext(pReader->pCur, &pKey, &kLen, &pVal, &vLen); - TSDB_CHECK_CODE(code, lino, end); + TSDB_CHECK_CONDITION(code == 0, code, lino, end, TDB_CODE_SUCCESS); *ppData = taosMemoryMalloc(sizeof(SSnapDataHdr) + vLen); TSDB_CHECK_NULL(*ppData, code, lino, end, terrno); diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index c9d28e0623..1a713bff5f 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -338,6 +338,7 @@ ,,y,system-test,./pytest.sh python3 test.py -f 7-tmq/tmqVnodeSplit-stb-select-duplicatedata.py -N 3 -n 3 ,,y,system-test,./pytest.sh python3 test.py -f 7-tmq/tmqVnodeSplit-stb-select-duplicatedata-false.py -N 3 -n 3 ,,y,system-test,./pytest.sh python3 test.py -f 7-tmq/tmqVnodeSplit-stb-select.py -N 3 -n 3 +,,y,system-test,./pytest.sh python3 test.py -f 7-tmq/tmqVnodeSplit-ntb-select.py -N 3 -n 3 ,,y,system-test,./pytest.sh python3 test.py -f 7-tmq/tmqVnodeSplit-stb-select-false.py -N 3 -n 3 ,,y,system-test,./pytest.sh python3 test.py -f 7-tmq/tmqVnodeSplit-stb.py -N 3 -n 3 ,,y,system-test,./pytest.sh python3 test.py -f 7-tmq/tmqVnodeSplit-stb-false.py -N 3 -n 3 From e65a3802a23223b54d252d5dd1a289dded3797d7 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Tue, 24 Dec 2024 09:21:09 +0800 Subject: [PATCH 7/8] fix:[TD-33272]add test case --- source/client/test/smlTest.cpp | 2 +- source/dnode/vnode/src/tq/tqOffset.c | 2 +- source/dnode/vnode/test/CMakeLists.txt | 30 ++++++++++++++------------ source/libs/parser/src/parInsertSml.c | 6 +++--- 4 files changed, 21 insertions(+), 19 deletions(-) diff --git a/source/client/test/smlTest.cpp b/source/client/test/smlTest.cpp index 87be16e467..d64cd9c971 100644 --- a/source/client/test/smlTest.cpp +++ b/source/client/test/smlTest.cpp @@ -276,7 +276,7 @@ TEST(testCase, smlParseCols_Test) { info->dataFormat = false; SSmlLineInfo elements = {0}; info->msgBuf = msgBuf; - ASSERT_EQ(smlInitHandle(NULL), 0); + ASSERT_EQ(smlInitHandle(NULL), TSDB_CODE_INVALID_PARA); const char *data = "st,t=1 cb\\=in=\"pass\\,it " diff --git a/source/dnode/vnode/src/tq/tqOffset.c b/source/dnode/vnode/src/tq/tqOffset.c index a131f30a04..6996f3578c 100644 --- a/source/dnode/vnode/src/tq/tqOffset.c +++ b/source/dnode/vnode/src/tq/tqOffset.c @@ -95,7 +95,7 @@ END: if (code != 0){ tqError("%s failed at %d since %s", __func__, lino, tstrerror(code)); } - taosCloseFile(&pFile); + (void)taosCloseFile(&pFile); taosMemoryFree(pMemBuf); tDeleteSTqOffset(pOffset); diff --git a/source/dnode/vnode/test/CMakeLists.txt b/source/dnode/vnode/test/CMakeLists.txt index 826296e99f..ff7e5a2196 100644 --- a/source/dnode/vnode/test/CMakeLists.txt +++ b/source/dnode/vnode/test/CMakeLists.txt @@ -3,23 +3,25 @@ MESSAGE(STATUS "tq unit test") # GoogleTest requires at least C++11 SET(CMAKE_CXX_STANDARD 11) -add_executable(tqTest tqTest.cpp) -target_include_directories(tqTest - PUBLIC - "${CMAKE_CURRENT_SOURCE_DIR}/../inc" -) +IF(NOT TD_WINDOWS) + add_executable(tqTest tqTest.cpp) + target_include_directories(tqTest + PUBLIC + "${CMAKE_CURRENT_SOURCE_DIR}/../inc" + ) -TARGET_LINK_LIBRARIES( - tqTest - PUBLIC os util common vnode gtest_main -) + TARGET_LINK_LIBRARIES( + tqTest + PUBLIC os util common vnode gtest_main + ) -enable_testing() + enable_testing() -add_test( - NAME tq_test - COMMAND tqTest -) + add_test( + NAME tq_test + COMMAND tqTest + ) +ENDIF() # ADD_EXECUTABLE(tsdbSmaTest tsdbSmaTest.cpp) # TARGET_LINK_LIBRARIES( diff --git a/source/libs/parser/src/parInsertSml.c b/source/libs/parser/src/parInsertSml.c index bf86ef718e..676aed4464 100644 --- a/source/libs/parser/src/parInsertSml.c +++ b/source/libs/parser/src/parInsertSml.c @@ -123,7 +123,7 @@ static int32_t smlBuildTagRow(SArray* cols, SBoundColInfo* tags, SSchema* pSchem val.pData = (uint8_t*)kv->value; val.nData = kv->length; } else if (pTagSchema->type == TSDB_DATA_TYPE_NCHAR) { - code = smlMbsToUcs4(kv->value, kv->length, (void**)&val.pData, &val.nData, kv->length * TSDB_NCHAR_SIZE, charsetCxt); + code = smlMbsToUcs4(kv->value, kv->length, (void**)&val.pData, (int32_t*)&val.nData, kv->length * TSDB_NCHAR_SIZE, charsetCxt); TSDB_CHECK_CODE(code, lino, end); } else { (void)memcpy(&val.i64, &(kv->value), kv->length); @@ -316,7 +316,7 @@ int32_t smlBindData(SQuery* query, bool dataFormat, SArray* tags, SArray* colsSc kv->i = convertTimePrecision(kv->i, TSDB_TIME_PRECISION_NANO, pTableMeta->tableInfo.precision); } if (kv->type == TSDB_DATA_TYPE_NCHAR) { - ret = smlMbsToUcs4(kv->value, kv->length, (void**)&pVal->value.pData, &pVal->value.nData, pColSchema->bytes - VARSTR_HEADER_SIZE, charsetCxt); + ret = smlMbsToUcs4(kv->value, kv->length, (void**)&pVal->value.pData, (int32_t*)&pVal->value.nData, pColSchema->bytes - VARSTR_HEADER_SIZE, charsetCxt); TSDB_CHECK_CODE(ret, lino, end); } else if (kv->type == TSDB_DATA_TYPE_BINARY) { pVal->value.nData = kv->length; @@ -345,7 +345,7 @@ int32_t smlBindData(SQuery* query, bool dataFormat, SArray* tags, SArray* colsSc end: if (ret != 0){ uError("%s failed at %d since %s", __func__, lino, tstrerror(ret)); - buildInvalidOperationMsg(&pBuf, tstrerror(ret)); + ret = buildInvalidOperationMsg(&pBuf, tstrerror(ret)); } insDestroyBoundColInfo(&bindTags); tdDestroySVCreateTbReq(pCreateTblReq); From 20c1e28d0c56707972f667c1c1bc21f83bc61023 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Tue, 24 Dec 2024 10:08:02 +0800 Subject: [PATCH 8/8] fix:[TD-33272]add test case --- source/dnode/vnode/src/tq/tq.c | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 37f3572f65..73052c1e5e 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -151,10 +151,8 @@ void tqClose(STQ* pTq) { taosHashCleanup(pTq->pOffset); taosMemoryFree(pTq->path); tqMetaClose(pTq); - - streamMetaClose(pTq->pStreamMeta); - qDebug("vgId:%d end to close tq", pTq->pStreamMeta != NULL ? pTq->pStreamMeta->vgId : -1); + streamMetaClose(pTq->pStreamMeta); taosMemoryFree(pTq); }