Merge pull request #29288 from taosdata/fix/TD-33272

fix:[TD-33272]add test case
This commit is contained in:
Shengliang Guan 2024-12-24 17:45:04 +08:00 committed by GitHub
commit 4aeda7d062
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 620 additions and 447 deletions

View File

@ -276,6 +276,7 @@ TEST(testCase, smlParseCols_Test) {
info->dataFormat = false; info->dataFormat = false;
SSmlLineInfo elements = {0}; SSmlLineInfo elements = {0};
info->msgBuf = msgBuf; info->msgBuf = msgBuf;
ASSERT_EQ(smlInitHandle(NULL), TSDB_CODE_INVALID_PARA);
const char *data = const char *data =
"st,t=1 cb\\=in=\"pass\\,it " "st,t=1 cb\\=in=\"pass\\,it "

View File

@ -11141,6 +11141,7 @@ void tOffsetCopy(STqOffsetVal *pLeft, const STqOffsetVal *pRight) {
} }
void tOffsetDestroy(void *param) { void tOffsetDestroy(void *param) {
if (param == NULL) return;
STqOffsetVal *pVal = (STqOffsetVal *)param; STqOffsetVal *pVal = (STqOffsetVal *)param;
if (IS_VAR_DATA_TYPE(pVal->primaryKey.type)) { if (IS_VAR_DATA_TYPE(pVal->primaryKey.type)) {
taosMemoryFreeClear(pVal->primaryKey.pData); taosMemoryFreeClear(pVal->primaryKey.pData);
@ -11148,6 +11149,7 @@ void tOffsetDestroy(void *param) {
} }
void tDeleteSTqOffset(void *param) { void tDeleteSTqOffset(void *param) {
if (param == NULL) return;
STqOffset *pVal = (STqOffset *)param; STqOffset *pVal = (STqOffset *)param;
tOffsetDestroy(&pVal->val); tOffsetDestroy(&pVal->val);
} }

View File

@ -151,11 +151,8 @@ void tqClose(STQ* pTq) {
taosHashCleanup(pTq->pOffset); taosHashCleanup(pTq->pOffset);
taosMemoryFree(pTq->path); taosMemoryFree(pTq->path);
tqMetaClose(pTq); tqMetaClose(pTq);
qDebug("vgId:%d end to close tq", pTq->pStreamMeta != NULL ? pTq->pStreamMeta->vgId : -1);
int32_t vgId = pTq->pStreamMeta->vgId;
streamMetaClose(pTq->pStreamMeta); streamMetaClose(pTq->pStreamMeta);
qDebug("vgId:%d end to close tq", vgId);
taosMemoryFree(pTq); taosMemoryFree(pTq);
} }

View File

@ -17,36 +17,41 @@
#include "tq.h" #include "tq.h"
int32_t tqBuildFName(char** data, const char* path, char* name) { int32_t tqBuildFName(char** data, const char* path, char* name) {
if (data == NULL || path == NULL || name == NULL) { int32_t code = 0;
return TSDB_CODE_INVALID_MSG; int32_t lino = 0;
} char* fname = NULL;
TSDB_CHECK_NULL(data, code, lino, END, TSDB_CODE_INVALID_MSG);
TSDB_CHECK_NULL(path, code, lino, END, TSDB_CODE_INVALID_MSG);
TSDB_CHECK_NULL(name, code, lino, END, TSDB_CODE_INVALID_MSG);
int32_t len = strlen(path) + strlen(name) + 2; int32_t len = strlen(path) + strlen(name) + 2;
char* fname = taosMemoryCalloc(1, len); fname = taosMemoryCalloc(1, len);
if(fname == NULL) { TSDB_CHECK_NULL(fname, code, lino, END, terrno);
return terrno; (void)tsnprintf(fname, len, "%s%s%s", path, TD_DIRSEP, name);
}
int32_t code = tsnprintf(fname, len, "%s%s%s", path, TD_DIRSEP, name);
if (code < 0){
code = TAOS_SYSTEM_ERROR(errno);
taosMemoryFree(fname);
return code;
}
*data = fname; *data = fname;
return TDB_CODE_SUCCESS; fname = NULL;
END:
if (code != 0){
tqError("%s failed at %d since %s", __func__, lino, tstrerror(code));
}
taosMemoryFree(fname);
return code;
} }
int32_t tqOffsetRestoreFromFile(STQ* pTq, char* name) { int32_t tqOffsetRestoreFromFile(STQ* pTq, char* name) {
if (pTq == NULL || name == NULL) { int32_t code = TDB_CODE_SUCCESS;
return TSDB_CODE_INVALID_MSG; int32_t lino = 0;
} void* pMemBuf = NULL;
int32_t code = TDB_CODE_SUCCESS; TdFilePtr pFile = NULL;
void* pMemBuf = NULL; STqOffset *pOffset = NULL;
void *pIter = NULL;
TdFilePtr pFile = taosOpenFile(name, TD_FILE_READ); TSDB_CHECK_NULL(pTq, code, lino, END, TSDB_CODE_INVALID_MSG);
if (pFile == NULL) { TSDB_CHECK_NULL(name, code, lino, END, TSDB_CODE_INVALID_MSG);
code = TDB_CODE_SUCCESS;
goto END; pFile = taosOpenFile(name, TD_FILE_READ);
} TSDB_CHECK_NULL(pFile, code, lino, END, TDB_CODE_SUCCESS);
int64_t ret = 0; int64_t ret = 0;
int32_t size = 0; int32_t size = 0;
@ -60,48 +65,41 @@ int32_t tqOffsetRestoreFromFile(STQ* pTq, char* name) {
} }
total += INT_BYTES; total += INT_BYTES;
size = htonl(size); size = htonl(size);
if (size <= 0) { TSDB_CHECK_CONDITION(size > 0, code, lino, END, TSDB_CODE_INVALID_MSG);
code = TSDB_CODE_INVALID_MSG;
goto END;
}
pMemBuf = taosMemoryCalloc(1, size);
if (pMemBuf == NULL) {
code = terrno;
goto END;
}
if (taosReadFile(pFile, pMemBuf, size) != size) { pMemBuf = taosMemoryCalloc(1, size);
terrno = TSDB_CODE_INVALID_MSG; TSDB_CHECK_NULL(pMemBuf, code, lino, END, terrno);
goto END; TSDB_CHECK_CONDITION(taosReadFile(pFile, pMemBuf, size) == size, code, lino, END, TSDB_CODE_INVALID_MSG);
}
total += size; total += size;
STqOffset offset = {0}; STqOffset offset = {0};
TQ_ERR_GO_TO_END(tqMetaDecodeOffsetInfo(&offset, pMemBuf, size)); code = tqMetaDecodeOffsetInfo(&offset, pMemBuf, size);
code = taosHashPut(pTq->pOffset, offset.subKey, strlen(offset.subKey), &offset, sizeof(STqOffset)); TSDB_CHECK_CODE(code, lino, END);
if (code != TDB_CODE_SUCCESS) { pOffset = &offset;
tDeleteSTqOffset(&offset); code = taosHashPut(pTq->pOffset, pOffset->subKey, strlen(pOffset->subKey), pOffset, sizeof(STqOffset));
goto END; TSDB_CHECK_CODE(code, lino, END);
} pOffset = NULL;
tqInfo("tq: offset restore from file to tdb, size:%d, hash size:%d subkey:%s", total, taosHashGetSize(pTq->pOffset), offset.subKey); tqInfo("tq: offset restore from file to tdb, size:%d, hash size:%d subkey:%s", total, taosHashGetSize(pTq->pOffset), offset.subKey);
taosMemoryFree(pMemBuf); taosMemoryFree(pMemBuf);
pMemBuf = NULL; pMemBuf = NULL;
} }
void *pIter = NULL;
while ((pIter = taosHashIterate(pTq->pOffset, pIter))) { while ((pIter = taosHashIterate(pTq->pOffset, pIter))) {
STqOffset* pOffset = (STqOffset*)pIter; STqOffset* offset = (STqOffset*)pIter;
code = tqMetaSaveOffset(pTq, pOffset); code = tqMetaSaveOffset(pTq, offset);
if(code != 0){ TSDB_CHECK_CODE(code, lino, END);
taosHashCancelIterate(pTq->pOffset, pIter);
goto END;
}
} }
END: END:
taosCloseFile(&pFile); if (code != 0){
tqError("%s failed at %d since %s", __func__, lino, tstrerror(code));
}
(void)taosCloseFile(&pFile);
taosMemoryFree(pMemBuf); taosMemoryFree(pMemBuf);
tDeleteSTqOffset(pOffset);
taosHashCancelIterate(pTq->pOffset, pIter);
return code; return code;
} }

View File

@ -27,18 +27,16 @@ struct STqSnapReader {
}; };
int32_t tqSnapReaderOpen(STQ* pTq, int64_t sver, int64_t ever, int8_t type, STqSnapReader** ppReader) { int32_t tqSnapReaderOpen(STQ* pTq, int64_t sver, int64_t ever, int8_t type, STqSnapReader** ppReader) {
if (pTq == NULL || ppReader == NULL) { int code = TSDB_CODE_SUCCESS;
return TSDB_CODE_INVALID_MSG; int32_t lino = 0;
}
int32_t code = 0;
STqSnapReader* pReader = NULL; STqSnapReader* pReader = NULL;
TSDB_CHECK_NULL(pTq, code, lino, end, TSDB_CODE_INVALID_MSG);
TSDB_CHECK_NULL(ppReader, code, lino, end, TSDB_CODE_INVALID_MSG);
// alloc // alloc
pReader = (STqSnapReader*)taosMemoryCalloc(1, sizeof(STqSnapReader)); pReader = (STqSnapReader*)taosMemoryCalloc(1, sizeof(STqSnapReader));
if (pReader == NULL) { TSDB_CHECK_NULL(pReader, code, lino, end, terrno);
code = terrno;
goto _err;
}
pReader->pTq = pTq; pReader->pTq = pTq;
pReader->sver = sver; pReader->sver = sver;
pReader->ever = ever; pReader->ever = ever;
@ -54,28 +52,21 @@ int32_t tqSnapReaderOpen(STQ* pTq, int64_t sver, int64_t ever, int8_t type, STqS
pTb = pTq->pOffsetStore; pTb = pTq->pOffsetStore;
} else { } else {
code = TSDB_CODE_INVALID_MSG; code = TSDB_CODE_INVALID_MSG;
goto _err; goto end;
} }
code = tdbTbcOpen(pTb, &pReader->pCur, NULL); code = tdbTbcOpen(pTb, &pReader->pCur, NULL);
if (code) { TSDB_CHECK_CODE(code, lino, end);
taosMemoryFree(pReader);
goto _err;
}
code = tdbTbcMoveToFirst(pReader->pCur); code = tdbTbcMoveToFirst(pReader->pCur);
if (code) { TSDB_CHECK_CODE(code, lino, end);
taosMemoryFree(pReader); tqInfo("vgId:%d, vnode tq snapshot reader opene success", TD_VID(pTq->pVnode));
goto _err; *ppReader = pReader;
end:
if (code != 0){
tqError("%s failed at %d, vnode tq snapshot reader open failed since %s", __func__, lino, tstrerror(code));
taosMemoryFreeClear(pReader);
} }
tqInfo("vgId:%d, vnode snapshot tq reader opened", TD_VID(pTq->pVnode));
*ppReader = pReader;
return code;
_err:
tqError("vgId:%d, vnode snapshot tq reader open failed since %s", TD_VID(pTq->pVnode), tstrerror(code));
*ppReader = NULL;
return code; return code;
} }
@ -84,45 +75,37 @@ void tqSnapReaderClose(STqSnapReader** ppReader) {
return; return;
} }
tdbTbcClose((*ppReader)->pCur); tdbTbcClose((*ppReader)->pCur);
taosMemoryFree(*ppReader); taosMemoryFreeClear(*ppReader);
*ppReader = NULL;
} }
int32_t tqSnapRead(STqSnapReader* pReader, uint8_t** ppData) { int32_t tqSnapRead(STqSnapReader* pReader, uint8_t** ppData) {
if (pReader == NULL || ppData == NULL) { int code = TSDB_CODE_SUCCESS;
return TSDB_CODE_INVALID_MSG; int32_t lino = 0;
} void* pKey = NULL;
int32_t code = 0; void* pVal = NULL;
void* pKey = NULL; int32_t kLen = 0;
void* pVal = NULL; int32_t vLen = 0;
int32_t kLen = 0; TSDB_CHECK_NULL(pReader, code, lino, end, TSDB_CODE_INVALID_MSG);
int32_t vLen = 0; TSDB_CHECK_NULL(ppData, code, lino, end, TSDB_CODE_INVALID_MSG);
if (tdbTbcNext(pReader->pCur, &pKey, &kLen, &pVal, &vLen)) { code = tdbTbcNext(pReader->pCur, &pKey, &kLen, &pVal, &vLen);
goto _exit; TSDB_CHECK_CONDITION(code == 0, code, lino, end, TDB_CODE_SUCCESS);
}
*ppData = taosMemoryMalloc(sizeof(SSnapDataHdr) + vLen); *ppData = taosMemoryMalloc(sizeof(SSnapDataHdr) + vLen);
if (*ppData == NULL) { TSDB_CHECK_NULL(*ppData, code, lino, end, terrno);
code = TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
goto _err;
}
SSnapDataHdr* pHdr = (SSnapDataHdr*)(*ppData); SSnapDataHdr* pHdr = (SSnapDataHdr*)(*ppData);
pHdr->type = pReader->type; pHdr->type = pReader->type;
pHdr->size = vLen; pHdr->size = vLen;
(void)memcpy(pHdr->data, pVal, vLen); (void)memcpy(pHdr->data, pVal, vLen);
tqInfo("vgId:%d, vnode tq snapshot read data, vLen:%d", TD_VID(pReader->pTq->pVnode), vLen);
_exit: end:
if (code != 0) {
tqError("%s failed at %d, vnode tq snapshot read data failed since %s", __func__, lino, tstrerror(code));
}
tdbFree(pKey); tdbFree(pKey);
tdbFree(pVal); tdbFree(pVal);
tqInfo("vgId:%d, vnode snapshot tq read data, vLen:%d", TD_VID(pReader->pTq->pVnode), vLen);
return code;
_err:
tdbFree(pKey);
tdbFree(pVal);
tqError("vgId:%d, vnode snapshot tq read data failed since %s", TD_VID(pReader->pTq->pVnode), tstrerror(code));
return code; return code;
} }
@ -135,135 +118,148 @@ struct STqSnapWriter {
}; };
int32_t tqSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, STqSnapWriter** ppWriter) { int32_t tqSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, STqSnapWriter** ppWriter) {
if (pTq == NULL || ppWriter == NULL) { int code = TSDB_CODE_SUCCESS;
return TSDB_CODE_INVALID_MSG; int32_t lino = 0;
}
int32_t code = 0;
STqSnapWriter* pWriter = NULL; STqSnapWriter* pWriter = NULL;
TSDB_CHECK_NULL(pTq, code, lino, end, TSDB_CODE_INVALID_MSG);
TSDB_CHECK_NULL(ppWriter, code, lino, end, TSDB_CODE_INVALID_MSG);
// alloc // alloc
pWriter = (STqSnapWriter*)taosMemoryCalloc(1, sizeof(*pWriter)); pWriter = (STqSnapWriter*)taosMemoryCalloc(1, sizeof(*pWriter));
if (pWriter == NULL) { TSDB_CHECK_NULL(pWriter, code, lino, end, terrno);
code = TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
;
goto _err;
}
pWriter->pTq = pTq; pWriter->pTq = pTq;
pWriter->sver = sver; pWriter->sver = sver;
pWriter->ever = ever; pWriter->ever = ever;
code = tdbBegin(pTq->pMetaDB, &pWriter->txn, tdbDefaultMalloc, tdbDefaultFree, NULL, 0); code = tdbBegin(pTq->pMetaDB, &pWriter->txn, tdbDefaultMalloc, tdbDefaultFree, NULL, 0);
if (code < 0) { TSDB_CHECK_CODE(code, lino, end);
taosMemoryFree(pWriter); tqInfo("vgId:%d, tq snapshot writer opene success", TD_VID(pTq->pVnode));
goto _err;
}
*ppWriter = pWriter; *ppWriter = pWriter;
return code;
_err: end:
tqError("vgId:%d, tq snapshot writer open failed since %s", TD_VID(pTq->pVnode), tstrerror(code)); if (code != 0){
*ppWriter = NULL; tqError("%s failed at %d tq snapshot writer open failed since %s", __func__, lino, tstrerror(code));
taosMemoryFreeClear(pWriter);
}
return code; return code;
} }
int32_t tqSnapWriterClose(STqSnapWriter** ppWriter, int8_t rollback) { int32_t tqSnapWriterClose(STqSnapWriter** ppWriter, int8_t rollback) {
if (ppWriter == NULL || *ppWriter == NULL) { int code = TSDB_CODE_SUCCESS;
return TSDB_CODE_INVALID_MSG; int32_t lino = 0;
} STqSnapWriter* pWriter = NULL;
int32_t code = 0;
STqSnapWriter* pWriter = *ppWriter; TSDB_CHECK_NULL(ppWriter, code, lino, end, TSDB_CODE_INVALID_MSG);
STQ* pTq = pWriter->pTq; TSDB_CHECK_NULL(*ppWriter, code, lino, end, TSDB_CODE_INVALID_MSG);
pWriter = *ppWriter;
if (rollback) { if (rollback) {
tdbAbort(pWriter->pTq->pMetaDB, pWriter->txn); tdbAbort(pWriter->pTq->pMetaDB, pWriter->txn);
} else { } else {
code = tdbCommit(pWriter->pTq->pMetaDB, pWriter->txn); code = tdbCommit(pWriter->pTq->pMetaDB, pWriter->txn);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, end);
code = tdbPostCommit(pWriter->pTq->pMetaDB, pWriter->txn); code = tdbPostCommit(pWriter->pTq->pMetaDB, pWriter->txn);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, end);
} }
tqInfo("vgId:%d, tq snapshot writer close success", TD_VID(pWriter->pTq->pVnode));
taosMemoryFreeClear(*ppWriter);
taosMemoryFree(pWriter); end:
*ppWriter = NULL; if (code != 0){
tqError("%s failed at %d, tq snapshot writer close failed since %s", __func__, lino, tstrerror(code));
return code; }
_err:
tqError("vgId:%d, tq snapshot writer close failed since %s", TD_VID(pTq->pVnode), tstrerror(code));
return code; return code;
} }
int32_t tqSnapHandleWrite(STqSnapWriter* pWriter, uint8_t* pData, uint32_t nData) { static int32_t tqWriteCheck(STqSnapWriter* pWriter, uint8_t* pData, uint32_t nData){
if (pWriter == NULL || pData == NULL || nData < sizeof(SSnapDataHdr)) { int code = TSDB_CODE_SUCCESS;
return TSDB_CODE_INVALID_MSG; int32_t lino = 0;
TSDB_CHECK_NULL(pWriter, code, lino, end, TSDB_CODE_INVALID_MSG);
TSDB_CHECK_NULL(pData, code, lino, end, TSDB_CODE_INVALID_MSG);
TSDB_CHECK_CONDITION(nData >= sizeof(SSnapDataHdr), code, lino, end, TSDB_CODE_INVALID_MSG);
end:
if (code != 0){
tqError("%s failed at %d failed since %s", __func__, lino, tstrerror(code));
} }
int32_t code = 0; return code;
STQ* pTq = pWriter->pTq; }
int32_t tqSnapHandleWrite(STqSnapWriter* pWriter, uint8_t* pData, uint32_t nData) {
int code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SDecoder decoder = {0}; SDecoder decoder = {0};
SDecoder* pDecoder = &decoder; SDecoder* pDecoder = &decoder;
STqHandle handle = {0}; STqHandle handle = {0};
code = tqWriteCheck(pWriter, pData, nData);
TSDB_CHECK_CODE(code, lino, end);
STQ* pTq = pWriter->pTq;
tDecoderInit(pDecoder, pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr)); tDecoderInit(pDecoder, pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr));
code = tDecodeSTqHandle(pDecoder, &handle); code = tDecodeSTqHandle(pDecoder, &handle);
if (code) goto end; TSDB_CHECK_CODE(code, lino, end);
taosWLockLatch(&pTq->lock); taosWLockLatch(&pTq->lock);
code = tqMetaSaveInfo(pTq, pTq->pExecStore, handle.subKey, strlen(handle.subKey), pData + sizeof(SSnapDataHdr), code = tqMetaSaveInfo(pTq, pTq->pExecStore, handle.subKey, strlen(handle.subKey), pData + sizeof(SSnapDataHdr),
nData - sizeof(SSnapDataHdr)); nData - sizeof(SSnapDataHdr));
taosWUnLockLatch(&pTq->lock); taosWUnLockLatch(&pTq->lock);
TSDB_CHECK_CODE(code, lino, end);
tqInfo("vgId:%d, vnode tq snapshot write success", TD_VID(pTq->pVnode));
end: end:
tDecoderClear(pDecoder); tDecoderClear(pDecoder);
tqDestroyTqHandle(&handle); tqDestroyTqHandle(&handle);
tqInfo("vgId:%d, vnode snapshot tq write result:%d", TD_VID(pTq->pVnode), code); if (code != 0){
tqError("%s failed at %d, vnode tq snapshot write failed since %s", __func__, lino, tstrerror(code));
}
return code; return code;
} }
int32_t tqSnapCheckInfoWrite(STqSnapWriter* pWriter, uint8_t* pData, uint32_t nData) { int32_t tqSnapCheckInfoWrite(STqSnapWriter* pWriter, uint8_t* pData, uint32_t nData) {
if (pWriter == NULL || pData == NULL || nData < sizeof(SSnapDataHdr)) { int code = TSDB_CODE_SUCCESS;
return TSDB_CODE_INVALID_MSG; int32_t lino = 0;
}
int32_t code = 0; code = tqWriteCheck(pWriter, pData, nData);
TSDB_CHECK_CODE(code, lino, end);
STQ* pTq = pWriter->pTq; STQ* pTq = pWriter->pTq;
STqCheckInfo info = {0}; STqCheckInfo info = {0};
code = tqMetaDecodeCheckInfo(&info, pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr)); code = tqMetaDecodeCheckInfo(&info, pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr));
if (code != 0) { TSDB_CHECK_CODE(code, lino, end);
goto _err;
}
code = tqMetaSaveInfo(pTq, pTq->pCheckStore, &info.topic, strlen(info.topic), pData + sizeof(SSnapDataHdr), code = tqMetaSaveInfo(pTq, pTq->pCheckStore, &info.topic, strlen(info.topic), pData + sizeof(SSnapDataHdr),
nData - sizeof(SSnapDataHdr)); nData - sizeof(SSnapDataHdr));
tDeleteSTqCheckInfo(&info); tDeleteSTqCheckInfo(&info);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, end);
tqInfo("vgId:%d, vnode tq check info write success", TD_VID(pTq->pVnode));
return code; end:
if (code != 0){
_err: tqError("%s failed at %d, vnode tq check info write failed since %s", __func__, lino, tstrerror(code));
tqError("vgId:%d, vnode check info tq write failed since %s", TD_VID(pTq->pVnode), tstrerror(code)); }
return code; return code;
} }
int32_t tqSnapOffsetWrite(STqSnapWriter* pWriter, uint8_t* pData, uint32_t nData) { int32_t tqSnapOffsetWrite(STqSnapWriter* pWriter, uint8_t* pData, uint32_t nData) {
if (pWriter == NULL || pData == NULL || nData < sizeof(SSnapDataHdr)) { int code = TSDB_CODE_SUCCESS;
return TSDB_CODE_INVALID_MSG; int32_t lino = 0;
} code = tqWriteCheck(pWriter, pData, nData);
int32_t code = 0; TSDB_CHECK_CODE(code, lino, end);
STQ* pTq = pWriter->pTq;
STQ* pTq = pWriter->pTq;
STqOffset info = {0}; STqOffset info = {0};
code = tqMetaDecodeOffsetInfo(&info, pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr)); code = tqMetaDecodeOffsetInfo(&info, pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr));
if (code != 0) { TSDB_CHECK_CODE(code, lino, end);
goto _err;
}
code = tqMetaSaveInfo(pTq, pTq->pOffsetStore, info.subKey, strlen(info.subKey), pData + sizeof(SSnapDataHdr), code = tqMetaSaveInfo(pTq, pTq->pOffsetStore, info.subKey, strlen(info.subKey), pData + sizeof(SSnapDataHdr),
nData - sizeof(SSnapDataHdr)); nData - sizeof(SSnapDataHdr));
tDeleteSTqOffset(&info); tDeleteSTqOffset(&info);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, end);
tqInfo("vgId:%d, vnode tq offset write success", TD_VID(pTq->pVnode));
return code; end:
if (code != 0){
_err: tqError("%s failed at %d, vnode tq offset write failed since %s", __func__, lino, tstrerror(code));
tqError("vgId:%d, vnode check info tq write failed since %s", TD_VID(pTq->pVnode), tstrerror(code)); }
return code; return code;
} }

View File

@ -1,29 +1,27 @@
MESSAGE(STATUS "vnode unit test") MESSAGE(STATUS "tq unit test")
# GoogleTest requires at least C++11 # GoogleTest requires at least C++11
SET(CMAKE_CXX_STANDARD 11) SET(CMAKE_CXX_STANDARD 11)
AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST)
# add_executable(tqTest "") IF(NOT TD_WINDOWS)
# target_sources(tqTest add_executable(tqTest tqTest.cpp)
# PRIVATE target_include_directories(tqTest
# "tqMetaTest.cpp" PUBLIC
# ) "${CMAKE_CURRENT_SOURCE_DIR}/../inc"
# target_include_directories(tqTest )
# PUBLIC
# "${TD_SOURCE_DIR}/include/server/vnode/tq"
# "${CMAKE_CURRENT_SOURCE_DIR}/../inc"
# )
# target_link_libraries(tqTest TARGET_LINK_LIBRARIES(
# tq tqTest
# gtest_main PUBLIC os util common vnode gtest_main
# ) )
# enable_testing()
# add_test( enable_testing()
# NAME tq_test
# COMMAND tqTest add_test(
# ) NAME tq_test
COMMAND tqTest
)
ENDIF()
# ADD_EXECUTABLE(tsdbSmaTest tsdbSmaTest.cpp) # ADD_EXECUTABLE(tsdbSmaTest tsdbSmaTest.cpp)
# TARGET_LINK_LIBRARIES( # TARGET_LINK_LIBRARIES(

View File

@ -0,0 +1,79 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <gtest/gtest.h>
#include <vnodeInt.h>
#include <taoserror.h>
#include <tglobal.h>
#include <iostream>
#include <tmsg.h>
#include <vnodeInt.h>
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wwrite-strings"
#pragma GCC diagnostic ignored "-Wunused-function"
#pragma GCC diagnostic ignored "-Wunused-variable"
#pragma GCC diagnostic ignored "-Wsign-compare"
SDmNotifyHandle dmNotifyHdl = {.state = 0};
#include "tq.h"
int main(int argc, char **argv) {
testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
void tqWriteOffset() {
TdFilePtr pFile = taosOpenFile(TQ_OFFSET_NAME, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND);
STqOffset offset = {.val = {.type = TMQ_OFFSET__LOG, .version = 8923}};
strcpy(offset.subKey, "testtest");
int32_t bodyLen;
int32_t code;
tEncodeSize(tEncodeSTqOffset, &offset, bodyLen, code);
int32_t totLen = INT_BYTES + bodyLen;
void* buf = taosMemoryCalloc(1, totLen);
void* abuf = POINTER_SHIFT(buf, INT_BYTES);
*(int32_t*)buf = htonl(bodyLen);
SEncoder encoder;
tEncoderInit(&encoder, (uint8_t*)abuf, bodyLen);
tEncodeSTqOffset(&encoder, &offset);
taosWriteFile(pFile, buf, totLen);
taosMemoryFree(buf);
taosCloseFile(&pFile);
}
TEST(testCase, tqOffsetTest) {
STQ* pTq = (STQ*)taosMemoryCalloc(1, sizeof(STQ));
pTq->path = taosStrdup("./");
pTq->pOffset = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, HASH_ENTRY_LOCK);
taosHashSetFreeFp(pTq->pOffset, (FDelete)tDeleteSTqOffset);
tdbOpen(pTq->path, 16 * 1024, 1, &pTq->pMetaDB, 0, 0, NULL);
tdbTbOpen("tq.offset.db", -1, -1, NULL, pTq->pMetaDB, &pTq->pOffsetStore, 0);
tqWriteOffset();
tqOffsetRestoreFromFile(pTq, TQ_OFFSET_NAME);
taosRemoveFile(TQ_OFFSET_NAME);
tqClose(pTq);
}
#pragma GCC diagnostic pop

View File

@ -20,40 +20,32 @@
int32_t qCreateSName(SName* pName, const char* pTableName, int32_t acctId, char* dbName, char* msgBuf, int32_t qCreateSName(SName* pName, const char* pTableName, int32_t acctId, char* dbName, char* msgBuf,
int32_t msgBufLen) { int32_t msgBufLen) {
SMsgBuf msg = {.buf = msgBuf, .len = msgBufLen}; SMsgBuf msg = {.buf = msgBuf, .len = msgBufLen};
SToken sToken; SToken sToken = {0};
int32_t code = 0; int code = TSDB_CODE_SUCCESS;
char* tbName = NULL; int32_t lino = 0;
NEXT_TOKEN(pTableName, sToken); NEXT_TOKEN(pTableName, sToken);
TSDB_CHECK_CONDITION(sToken.n != 0, code, lino, end, TSDB_CODE_TSC_INVALID_OPERATION);
if (sToken.n == 0) {
return buildInvalidOperationMsg(&msg, "empty table name");
}
code = insCreateSName(pName, &sToken, acctId, dbName, &msg); code = insCreateSName(pName, &sToken, acctId, dbName, &msg);
if (code) { TSDB_CHECK_CODE(code, lino, end);
return code;
}
NEXT_TOKEN(pTableName, sToken); NEXT_TOKEN(pTableName, sToken);
TSDB_CHECK_CONDITION(sToken.n <= 0, code, lino, end, TSDB_CODE_TSC_INVALID_OPERATION);
if (sToken.n > 0) { end:
return buildInvalidOperationMsg(&msg, "table name format is wrong"); if (code != 0){
uError("%s failed at %d since %s", __func__, lino, tstrerror(code));
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t smlBoundColumnData(SArray* cols, SBoundColInfo* pBoundInfo, SSchema* pSchema, bool isTag) { static int32_t smlBoundColumnData(SArray* cols, SBoundColInfo* pBoundInfo, SSchema* pSchema, bool isTag) {
int code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
bool* pUseCols = taosMemoryCalloc(pBoundInfo->numOfCols, sizeof(bool)); bool* pUseCols = taosMemoryCalloc(pBoundInfo->numOfCols, sizeof(bool));
if (NULL == pUseCols) { TSDB_CHECK_NULL(pUseCols, code, lino, end, terrno);
return terrno;
}
pBoundInfo->numOfBound = 0; pBoundInfo->numOfBound = 0;
int16_t lastColIdx = -1; // last column found int16_t lastColIdx = -1; // last column found
int32_t code = TSDB_CODE_SUCCESS;
for (int i = 0; i < taosArrayGetSize(cols); ++i) { for (int i = 0; i < taosArrayGetSize(cols); ++i) {
SSmlKv* kv = taosArrayGet(cols, i); SSmlKv* kv = taosArrayGet(cols, i);
@ -65,16 +57,9 @@ static int32_t smlBoundColumnData(SArray* cols, SBoundColInfo* pBoundInfo, SSche
index = insFindCol(&sToken, 0, t, pSchema); index = insFindCol(&sToken, 0, t, pSchema);
} }
if (index < 0) { TSDB_CHECK_CONDITION(index >= 0, code, lino, end, TSDB_CODE_SML_INVALID_DATA);
uError("smlBoundColumnData. index:%d", index); TSDB_CHECK_CONDITION(!pUseCols[index], code, lino, end, TSDB_CODE_SML_INVALID_DATA);
code = TSDB_CODE_SML_INVALID_DATA;
goto end;
}
if (pUseCols[index]) {
uError("smlBoundColumnData. already set. index:%d", index);
code = TSDB_CODE_SML_INVALID_DATA;
goto end;
}
lastColIdx = index; lastColIdx = index;
pUseCols[index] = true; pUseCols[index] = true;
pBoundInfo->pColIndex[pBoundInfo->numOfBound] = index; pBoundInfo->pColIndex[pBoundInfo->numOfBound] = index;
@ -82,11 +67,30 @@ static int32_t smlBoundColumnData(SArray* cols, SBoundColInfo* pBoundInfo, SSche
} }
end: end:
if (code != 0){
uError("%s failed at %d since %s", __func__, lino, tstrerror(code));
}
taosMemoryFree(pUseCols); taosMemoryFree(pUseCols);
return code; return code;
} }
static int32_t smlMbsToUcs4(const char* mbs, size_t mbsLen, void** result, int32_t* resultLen, int32_t maxLen, void* charsetCxt){
int code = TSDB_CODE_SUCCESS;
void* pUcs4 = NULL;
int32_t lino = 0;
pUcs4 = taosMemoryCalloc(1, maxLen);
TSDB_CHECK_NULL(pUcs4, code, lino, end, terrno);
TSDB_CHECK_CONDITION(taosMbsToUcs4(mbs, mbsLen, (TdUcs4*)pUcs4, maxLen, resultLen, charsetCxt), code, lino, end, terrno);
*result = pUcs4;
pUcs4 = NULL;
end:
if (code != 0){
uError("%s failed at %d since %s", __func__, lino, tstrerror(code));
}
taosMemoryFree(pUcs4);
return code;
}
/** /**
* @brief No json tag for schemaless * @brief No json tag for schemaless
* *
@ -99,75 +103,39 @@ end:
*/ */
static int32_t smlBuildTagRow(SArray* cols, SBoundColInfo* tags, SSchema* pSchema, STag** ppTag, SArray** tagName, static int32_t smlBuildTagRow(SArray* cols, SBoundColInfo* tags, SSchema* pSchema, STag** ppTag, SArray** tagName,
SMsgBuf* msg, void* charsetCxt) { SMsgBuf* msg, void* charsetCxt) {
SArray* pTagArray = taosArrayInit(tags->numOfBound, sizeof(STagVal)); int code = TSDB_CODE_SUCCESS;
if (!pTagArray) { int32_t lino = 0;
return terrno; SArray* pTagArray = taosArrayInit(tags->numOfBound, sizeof(STagVal));
} TSDB_CHECK_NULL(pTagArray, code, lino, end, terrno);
*tagName = taosArrayInit(8, TSDB_COL_NAME_LEN); *tagName = taosArrayInit(8, TSDB_COL_NAME_LEN);
if (!*tagName) { TSDB_CHECK_NULL(*tagName, code, lino, end, terrno);
return terrno;
}
int32_t code = TSDB_CODE_SUCCESS;
for (int i = 0; i < tags->numOfBound; ++i) { for (int i = 0; i < tags->numOfBound; ++i) {
SSchema* pTagSchema = &pSchema[tags->pColIndex[i]]; SSchema* pTagSchema = &pSchema[tags->pColIndex[i]];
SSmlKv* kv = taosArrayGet(cols, i); SSmlKv* kv = taosArrayGet(cols, i);
if (kv == NULL){ TSDB_CHECK_NULL(kv, code, lino, end, terrno);
code = terrno; bool cond = (kv->keyLen == strlen(pTagSchema->name) && memcmp(kv->key, pTagSchema->name, kv->keyLen) == 0 && kv->type == pTagSchema->type);
uError("SML smlBuildTagRow error kv is null"); TSDB_CHECK_CONDITION(cond, code, lino, end, TSDB_CODE_SML_INVALID_DATA);
goto end; TSDB_CHECK_NULL(taosArrayPush(*tagName, pTagSchema->name), code, lino, end, terrno);
}
if (kv->keyLen != strlen(pTagSchema->name) || memcmp(kv->key, pTagSchema->name, kv->keyLen) != 0 ||
kv->type != pTagSchema->type) {
code = TSDB_CODE_SML_INVALID_DATA;
uError("SML smlBuildTagRow error col not same %s", pTagSchema->name);
goto end;
}
if (taosArrayPush(*tagName, pTagSchema->name) == NULL){
code = terrno;
uError("SML smlBuildTagRow error push tag name");
goto end;
}
STagVal val = {.cid = pTagSchema->colId, .type = pTagSchema->type}; STagVal val = {.cid = pTagSchema->colId, .type = pTagSchema->type};
// strcpy(val.colName, pTagSchema->name);
if (pTagSchema->type == TSDB_DATA_TYPE_BINARY || pTagSchema->type == TSDB_DATA_TYPE_VARBINARY || if (pTagSchema->type == TSDB_DATA_TYPE_BINARY || pTagSchema->type == TSDB_DATA_TYPE_VARBINARY ||
pTagSchema->type == TSDB_DATA_TYPE_GEOMETRY) { pTagSchema->type == TSDB_DATA_TYPE_GEOMETRY) {
val.pData = (uint8_t*)kv->value; val.pData = (uint8_t*)kv->value;
val.nData = kv->length; val.nData = kv->length;
} else if (pTagSchema->type == TSDB_DATA_TYPE_NCHAR) { } else if (pTagSchema->type == TSDB_DATA_TYPE_NCHAR) {
int32_t output = 0; code = smlMbsToUcs4(kv->value, kv->length, (void**)&val.pData, (int32_t*)&val.nData, kv->length * TSDB_NCHAR_SIZE, charsetCxt);
void* p = taosMemoryCalloc(1, kv->length * TSDB_NCHAR_SIZE); TSDB_CHECK_CODE(code, lino, end);
if (p == NULL) {
code = terrno;
goto end;
}
if (!taosMbsToUcs4(kv->value, kv->length, (TdUcs4*)(p), kv->length * TSDB_NCHAR_SIZE, &output, charsetCxt)) {
if (terrno == TAOS_SYSTEM_ERROR(E2BIG)) {
taosMemoryFree(p);
code = generateSyntaxErrMsg(msg, TSDB_CODE_PAR_VALUE_TOO_LONG, pTagSchema->name);
goto end;
}
char buf[512] = {0};
(void)snprintf(buf, tListLen(buf), " taosMbsToUcs4 error:%s", strerror(terrno));
taosMemoryFree(p);
code = buildSyntaxErrMsg(msg, buf, kv->value);
goto end;
}
val.pData = p;
val.nData = output;
} else { } else {
(void)memcpy(&val.i64, &(kv->value), kv->length); (void)memcpy(&val.i64, &(kv->value), kv->length);
} }
if (taosArrayPush(pTagArray, &val) == NULL){ TSDB_CHECK_NULL(taosArrayPush(pTagArray, &val), code, lino, end, terrno);
code = terrno;
uError("SML smlBuildTagRow error push tag array");
goto end;
}
} }
code = tTagNew(pTagArray, 1, false, ppTag); code = tTagNew(pTagArray, 1, false, ppTag);
end: end:
if (code != 0){
uError("%s failed at %d since %s", __func__, lino, tstrerror(code));
}
for (int i = 0; i < taosArrayGetSize(pTagArray); ++i) { for (int i = 0; i < taosArrayGetSize(pTagArray); ++i) {
STagVal* p = (STagVal*)taosArrayGet(pTagArray, i); STagVal* p = (STagVal*)taosArrayGet(pTagArray, i);
if (p->type == TSDB_DATA_TYPE_NCHAR) { if (p->type == TSDB_DATA_TYPE_NCHAR) {
@ -179,18 +147,20 @@ end:
} }
int32_t smlInitTableDataCtx(SQuery* query, STableMeta* pTableMeta, STableDataCxt** cxt) { int32_t smlInitTableDataCtx(SQuery* query, STableMeta* pTableMeta, STableDataCxt** cxt) {
int ret = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SVCreateTbReq* pCreateTbReq = NULL; SVCreateTbReq* pCreateTbReq = NULL;
int ret = insGetTableDataCxt(((SVnodeModifyOpStmt*)(query->pRoot))->pTableBlockHashObj, &pTableMeta->uid, ret = insGetTableDataCxt(((SVnodeModifyOpStmt*)(query->pRoot))->pTableBlockHashObj, &pTableMeta->uid,
sizeof(pTableMeta->uid), pTableMeta, &pCreateTbReq, cxt, false, false); sizeof(pTableMeta->uid), pTableMeta, &pCreateTbReq, cxt, false, false);
if (ret != TSDB_CODE_SUCCESS) { TSDB_CHECK_CODE(ret, lino, end);
return ret;
}
ret = initTableColSubmitData(*cxt); ret = initTableColSubmitData(*cxt);
if (ret != TSDB_CODE_SUCCESS) { TSDB_CHECK_CODE(ret, lino, end);
return ret;
end:
if (ret != 0){
uError("%s failed at %d since %s", __func__, lino, tstrerror(ret));
} }
return TSDB_CODE_SUCCESS; return ret;
} }
void clearColValArraySml(SArray* pCols) { void clearColValArraySml(SArray* pCols) {
@ -207,78 +177,51 @@ void clearColValArraySml(SArray* pCols) {
} }
int32_t smlBuildRow(STableDataCxt* pTableCxt) { int32_t smlBuildRow(STableDataCxt* pTableCxt) {
SRow** pRow = taosArrayReserve(pTableCxt->pData->aRowP, 1); int ret = TSDB_CODE_SUCCESS;
if (pRow == NULL){ int32_t lino = 0;
return terrno; SRow** pRow = taosArrayReserve(pTableCxt->pData->aRowP, 1);
} TSDB_CHECK_NULL(pRow, ret, lino, end, terrno);
int ret = tRowBuild(pTableCxt->pValues, pTableCxt->pSchema, pRow); ret = tRowBuild(pTableCxt->pValues, pTableCxt->pSchema, pRow);
if (TSDB_CODE_SUCCESS != ret) { TSDB_CHECK_CODE(ret, lino, end);
return ret;
}
SRowKey key; SRowKey key;
tRowGetKey(*pRow, &key); tRowGetKey(*pRow, &key);
insCheckTableDataOrder(pTableCxt, &key); insCheckTableDataOrder(pTableCxt, &key);
return TSDB_CODE_SUCCESS; end:
if (ret != 0){
uError("%s failed at %d since %s", __func__, lino, tstrerror(ret));
}
return ret;
} }
int32_t smlBuildCol(STableDataCxt* pTableCxt, SSchema* schema, void* data, int32_t index, void* charsetCxt) { int32_t smlBuildCol(STableDataCxt* pTableCxt, SSchema* schema, void* data, int32_t index, void* charsetCxt) {
int ret = TSDB_CODE_SUCCESS; int ret = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SSchema* pColSchema = schema + index; SSchema* pColSchema = schema + index;
SColVal* pVal = taosArrayGet(pTableCxt->pValues, index); SColVal* pVal = taosArrayGet(pTableCxt->pValues, index);
if (pVal == NULL) { TSDB_CHECK_NULL(pVal, ret, lino, end, TSDB_CODE_SUCCESS);
return TSDB_CODE_SUCCESS;
}
SSmlKv* kv = (SSmlKv*)data; SSmlKv* kv = (SSmlKv*)data;
if (kv->keyLen != strlen(pColSchema->name) || memcmp(kv->key, pColSchema->name, kv->keyLen) != 0 || if (kv->keyLen != strlen(pColSchema->name) || memcmp(kv->key, pColSchema->name, kv->keyLen) != 0 ||
kv->type != pColSchema->type) { kv->type != pColSchema->type) {
ret = TSDB_CODE_SML_INVALID_DATA; ret = TSDB_CODE_SML_INVALID_DATA;
char* tmp = taosMemoryCalloc(kv->keyLen + 1, 1); char* tmp = taosMemoryCalloc(kv->keyLen + 1, 1);
if (tmp) { TSDB_CHECK_NULL(tmp, ret, lino, end, terrno);
(void)memcpy(tmp, kv->key, kv->keyLen); (void)memcpy(tmp, kv->key, kv->keyLen);
uInfo("SML data(name:%s type:%s) is not same like the db data(name:%s type:%s)", tmp, tDataTypes[kv->type].name, uInfo("SML data(name:%s type:%s) is not same like the db data(name:%s type:%s)", tmp, tDataTypes[kv->type].name,
pColSchema->name, tDataTypes[pColSchema->type].name); pColSchema->name, tDataTypes[pColSchema->type].name);
taosMemoryFree(tmp); taosMemoryFree(tmp);
} else {
uError("SML smlBuildCol out of memory");
ret = terrno;
}
goto end; goto end;
} }
if (kv->type == TSDB_DATA_TYPE_NCHAR) { if (kv->type == TSDB_DATA_TYPE_NCHAR) {
int32_t len = 0; ret = smlMbsToUcs4(kv->value, kv->length, (void**)&pVal->value.pData, &pVal->value.nData, pColSchema->bytes - VARSTR_HEADER_SIZE, charsetCxt);
int64_t size = pColSchema->bytes - VARSTR_HEADER_SIZE; TSDB_CHECK_CODE(ret, lino, end);
if (size <= 0) {
ret = TSDB_CODE_SML_INVALID_DATA;
goto end;
}
char* pUcs4 = taosMemoryCalloc(1, size);
if (NULL == pUcs4) {
ret = terrno;
goto end;
}
if (!taosMbsToUcs4(kv->value, kv->length, (TdUcs4*)pUcs4, size, &len, charsetCxt)) {
if (terrno == TAOS_SYSTEM_ERROR(E2BIG)) {
taosMemoryFree(pUcs4);
ret = TSDB_CODE_PAR_VALUE_TOO_LONG;
goto end;
}
taosMemoryFree(pUcs4);
ret = TSDB_CODE_TSC_INVALID_VALUE;
goto end;
}
pVal->value.pData = pUcs4;
pVal->value.nData = len;
} else if (kv->type == TSDB_DATA_TYPE_BINARY) { } else if (kv->type == TSDB_DATA_TYPE_BINARY) {
pVal->value.nData = kv->length; pVal->value.nData = kv->length;
pVal->value.pData = (uint8_t*)kv->value; pVal->value.pData = (uint8_t*)kv->value;
} else if (kv->type == TSDB_DATA_TYPE_GEOMETRY || kv->type == TSDB_DATA_TYPE_VARBINARY) { } else if (kv->type == TSDB_DATA_TYPE_GEOMETRY || kv->type == TSDB_DATA_TYPE_VARBINARY) {
pVal->value.nData = kv->length; pVal->value.nData = kv->length;
pVal->value.pData = taosMemoryMalloc(kv->length); pVal->value.pData = taosMemoryMalloc(kv->length);
if (!pVal->value.pData) { TSDB_CHECK_NULL(pVal->value.pData, ret, lino, end, terrno);
ret = terrno;
uError("SML smlBuildCol malloc failed %s:%d, err: %s", __func__, __LINE__, tstrerror(ret));
goto end;
}
(void)memcpy(pVal->value.pData, (uint8_t*)kv->value, kv->length); (void)memcpy(pVal->value.pData, (uint8_t*)kv->value, kv->length);
} else { } else {
(void)memcpy(&pVal->value.val, &(kv->value), kv->length); (void)memcpy(&pVal->value.val, &(kv->value), kv->length);
@ -286,12 +229,17 @@ int32_t smlBuildCol(STableDataCxt* pTableCxt, SSchema* schema, void* data, int32
pVal->flag = CV_FLAG_VALUE; pVal->flag = CV_FLAG_VALUE;
end: end:
if (ret != 0){
uError("%s failed at %d since %s", __func__, lino, tstrerror(ret));
}
return ret; return ret;
} }
int32_t smlBindData(SQuery* query, bool dataFormat, SArray* tags, SArray* colsSchema, SArray* cols, int32_t smlBindData(SQuery* query, bool dataFormat, SArray* tags, SArray* colsSchema, SArray* cols,
STableMeta* pTableMeta, char* tableName, const char* sTableName, int32_t sTableNameLen, int32_t ttl, STableMeta* pTableMeta, char* tableName, const char* sTableName, int32_t sTableNameLen, int32_t ttl,
char* msgBuf, int32_t msgBufLen, void* charsetCxt) { char* msgBuf, int32_t msgBufLen, void* charsetCxt) {
int32_t lino = 0;
int32_t ret = 0;
SMsgBuf pBuf = {.buf = msgBuf, .len = msgBufLen}; SMsgBuf pBuf = {.buf = msgBuf, .len = msgBufLen};
SSchema* pTagsSchema = getTableTagSchema(pTableMeta); SSchema* pTagsSchema = getTableTagSchema(pTableMeta);
@ -299,50 +247,32 @@ int32_t smlBindData(SQuery* query, bool dataFormat, SArray* tags, SArray* colsSc
SVCreateTbReq* pCreateTblReq = NULL; SVCreateTbReq* pCreateTblReq = NULL;
SArray* tagName = NULL; SArray* tagName = NULL;
int ret = insInitBoundColsInfo(getNumOfTags(pTableMeta), &bindTags); ret = insInitBoundColsInfo(getNumOfTags(pTableMeta), &bindTags);
if (ret != TSDB_CODE_SUCCESS) { TSDB_CHECK_CODE(ret, lino, end);
ret = buildInvalidOperationMsg(&pBuf, "init bound cols error");
goto end;
}
ret = smlBoundColumnData(tags, &bindTags, pTagsSchema, true); ret = smlBoundColumnData(tags, &bindTags, pTagsSchema, true);
if (ret != TSDB_CODE_SUCCESS) { TSDB_CHECK_CODE(ret, lino, end);
ret = buildInvalidOperationMsg(&pBuf, "bound tags error");
goto end;
}
STag* pTag = NULL; STag* pTag = NULL;
ret = smlBuildTagRow(tags, &bindTags, pTagsSchema, &pTag, &tagName, &pBuf, charsetCxt); ret = smlBuildTagRow(tags, &bindTags, pTagsSchema, &pTag, &tagName, &pBuf, charsetCxt);
if (ret != TSDB_CODE_SUCCESS) { TSDB_CHECK_CODE(ret, lino, end);
goto end;
}
pCreateTblReq = taosMemoryCalloc(1, sizeof(SVCreateTbReq)); pCreateTblReq = taosMemoryCalloc(1, sizeof(SVCreateTbReq));
if (NULL == pCreateTblReq) { TSDB_CHECK_NULL(pCreateTblReq, ret, lino, end, terrno);
ret = terrno;
goto end;
}
ret = insBuildCreateTbReq(pCreateTblReq, tableName, pTag, pTableMeta->suid, NULL, tagName, ret = insBuildCreateTbReq(pCreateTblReq, tableName, pTag, pTableMeta->suid, NULL, tagName,
pTableMeta->tableInfo.numOfTags, ttl); pTableMeta->tableInfo.numOfTags, ttl);
if (TSDB_CODE_SUCCESS != ret) { TSDB_CHECK_CODE(ret, lino, end);
goto end;
}
pCreateTblReq->ctb.stbName = taosMemoryCalloc(1, sTableNameLen + 1); pCreateTblReq->ctb.stbName = taosMemoryCalloc(1, sTableNameLen + 1);
if (pCreateTblReq->ctb.stbName == NULL){ TSDB_CHECK_NULL(pCreateTblReq->ctb.stbName, ret, lino, end, terrno);
ret = terrno;
goto end;
}
(void)memcpy(pCreateTblReq->ctb.stbName, sTableName, sTableNameLen); (void)memcpy(pCreateTblReq->ctb.stbName, sTableName, sTableNameLen);
if (dataFormat) { if (dataFormat) {
STableDataCxt** pTableCxt = (STableDataCxt**)taosHashGet(((SVnodeModifyOpStmt*)(query->pRoot))->pTableBlockHashObj, STableDataCxt** pTableCxt = (STableDataCxt**)taosHashGet(((SVnodeModifyOpStmt*)(query->pRoot))->pTableBlockHashObj,
&pTableMeta->uid, sizeof(pTableMeta->uid)); &pTableMeta->uid, sizeof(pTableMeta->uid));
if (NULL == pTableCxt) { TSDB_CHECK_NULL(pTableCxt, ret, lino, end, TSDB_CODE_TSC_INVALID_OPERATION);
ret = buildInvalidOperationMsg(&pBuf, "dataformat true. get tableDataCtx error");
goto end;
}
(*pTableCxt)->pData->flags |= SUBMIT_REQ_AUTO_CREATE_TABLE; (*pTableCxt)->pData->flags |= SUBMIT_REQ_AUTO_CREATE_TABLE;
(*pTableCxt)->pData->pCreateTbReq = pCreateTblReq; (*pTableCxt)->pData->pCreateTbReq = pCreateTblReq;
(*pTableCxt)->pMeta->uid = pTableMeta->uid; (*pTableCxt)->pMeta->uid = pTableMeta->uid;
@ -354,86 +284,47 @@ int32_t smlBindData(SQuery* query, bool dataFormat, SArray* tags, SArray* colsSc
STableDataCxt* pTableCxt = NULL; STableDataCxt* pTableCxt = NULL;
ret = insGetTableDataCxt(((SVnodeModifyOpStmt*)(query->pRoot))->pTableBlockHashObj, &pTableMeta->uid, ret = insGetTableDataCxt(((SVnodeModifyOpStmt*)(query->pRoot))->pTableBlockHashObj, &pTableMeta->uid,
sizeof(pTableMeta->uid), pTableMeta, &pCreateTblReq, &pTableCxt, false, false); sizeof(pTableMeta->uid), pTableMeta, &pCreateTblReq, &pTableCxt, false, false);
if (ret != TSDB_CODE_SUCCESS) { TSDB_CHECK_CODE(ret, lino, end);
ret = buildInvalidOperationMsg(&pBuf, "insGetTableDataCxt error");
goto end;
}
SSchema* pSchema = getTableColumnSchema(pTableMeta); SSchema* pSchema = getTableColumnSchema(pTableMeta);
ret = smlBoundColumnData(colsSchema, &pTableCxt->boundColsInfo, pSchema, false); ret = smlBoundColumnData(colsSchema, &pTableCxt->boundColsInfo, pSchema, false);
if (ret != TSDB_CODE_SUCCESS) { TSDB_CHECK_CODE(ret, lino, end);
ret = buildInvalidOperationMsg(&pBuf, "bound cols error");
goto end;
}
ret = initTableColSubmitData(pTableCxt); ret = initTableColSubmitData(pTableCxt);
if (ret != TSDB_CODE_SUCCESS) { TSDB_CHECK_CODE(ret, lino, end);
ret = buildInvalidOperationMsg(&pBuf, "initTableColSubmitData error");
goto end;
}
int32_t rowNum = taosArrayGetSize(cols); int32_t rowNum = taosArrayGetSize(cols);
if (rowNum <= 0) { TSDB_CHECK_CONDITION(rowNum > 0, ret, lino, end, TSDB_CODE_TSC_INVALID_OPERATION);
ret = buildInvalidOperationMsg(&pBuf, "cols size <= 0");
goto end;
}
for (int32_t r = 0; r < rowNum; ++r) { for (int32_t r = 0; r < rowNum; ++r) {
void* rowData = taosArrayGetP(cols, r); void* rowData = taosArrayGetP(cols, r);
if (rowData == NULL) { TSDB_CHECK_NULL(rowData, ret, lino, end, terrno);
ret = terrno;
goto end;
}
// 1. set the parsed value from sql string // 1. set the parsed value from sql string
for (int c = 0; c < pTableCxt->boundColsInfo.numOfBound; ++c) { for (int c = 0; c < pTableCxt->boundColsInfo.numOfBound; ++c) {
SSchema* pColSchema = &pSchema[pTableCxt->boundColsInfo.pColIndex[c]]; SSchema* pColSchema = &pSchema[pTableCxt->boundColsInfo.pColIndex[c]];
SColVal* pVal = taosArrayGet(pTableCxt->pValues, pTableCxt->boundColsInfo.pColIndex[c]); SColVal* pVal = taosArrayGet(pTableCxt->pValues, pTableCxt->boundColsInfo.pColIndex[c]);
if (pVal == NULL) { TSDB_CHECK_NULL(pVal, ret, lino, end, terrno);
ret = terrno;
goto end;
}
void** p = taosHashGet(rowData, pColSchema->name, strlen(pColSchema->name)); void** p = taosHashGet(rowData, pColSchema->name, strlen(pColSchema->name));
if (p == NULL) { if (p == NULL) {
continue; continue;
} }
SSmlKv* kv = *(SSmlKv**)p; SSmlKv* kv = *(SSmlKv**)p;
if (kv->type != pColSchema->type) { TSDB_CHECK_CONDITION(kv->type == pColSchema->type, ret, lino, end, TSDB_CODE_TSC_INVALID_OPERATION);
ret = buildInvalidOperationMsg(&pBuf, "kv type not equal to col type");
goto end;
}
if (pColSchema->type == TSDB_DATA_TYPE_TIMESTAMP) { if (pColSchema->type == TSDB_DATA_TYPE_TIMESTAMP) {
kv->i = convertTimePrecision(kv->i, TSDB_TIME_PRECISION_NANO, pTableMeta->tableInfo.precision); kv->i = convertTimePrecision(kv->i, TSDB_TIME_PRECISION_NANO, pTableMeta->tableInfo.precision);
} }
if (kv->type == TSDB_DATA_TYPE_NCHAR) { if (kv->type == TSDB_DATA_TYPE_NCHAR) {
int32_t len = 0; ret = smlMbsToUcs4(kv->value, kv->length, (void**)&pVal->value.pData, (int32_t*)&pVal->value.nData, pColSchema->bytes - VARSTR_HEADER_SIZE, charsetCxt);
char* pUcs4 = taosMemoryCalloc(1, pColSchema->bytes - VARSTR_HEADER_SIZE); TSDB_CHECK_CODE(ret, lino, end);
if (NULL == pUcs4) {
ret = terrno;
goto end;
}
if (!taosMbsToUcs4(kv->value, kv->length, (TdUcs4*)pUcs4, pColSchema->bytes - VARSTR_HEADER_SIZE, &len, charsetCxt)) {
if (terrno == TAOS_SYSTEM_ERROR(E2BIG)) {
uError("sml bind taosMbsToUcs4 error, kv length:%d, bytes:%d, kv->value:%s", (int)kv->length,
pColSchema->bytes, kv->value);
(void)buildInvalidOperationMsg(&pBuf, "value too long");
ret = TSDB_CODE_PAR_VALUE_TOO_LONG;
goto end;
}
ret = buildInvalidOperationMsg(&pBuf, strerror(terrno));
goto end;
}
pVal->value.pData = pUcs4;
pVal->value.nData = len;
} else if (kv->type == TSDB_DATA_TYPE_BINARY) { } else if (kv->type == TSDB_DATA_TYPE_BINARY) {
pVal->value.nData = kv->length; pVal->value.nData = kv->length;
pVal->value.pData = (uint8_t*)kv->value; pVal->value.pData = (uint8_t*)kv->value;
} else if (kv->type == TSDB_DATA_TYPE_GEOMETRY || kv->type == TSDB_DATA_TYPE_VARBINARY) { } else if (kv->type == TSDB_DATA_TYPE_GEOMETRY || kv->type == TSDB_DATA_TYPE_VARBINARY) {
pVal->value.nData = kv->length; pVal->value.nData = kv->length;
pVal->value.pData = taosMemoryMalloc(kv->length); pVal->value.pData = taosMemoryMalloc(kv->length);
if (NULL == pVal->value.pData) { TSDB_CHECK_NULL(pVal->value.pData, ret, lino, end, terrno);
ret = terrno;
goto end;
}
(void)memcpy(pVal->value.pData, (uint8_t*)kv->value, kv->length); (void)memcpy(pVal->value.pData, (uint8_t*)kv->value, kv->length);
} else { } else {
(void)memcpy(&pVal->value.val, &(kv->value), kv->length); (void)memcpy(&pVal->value.val, &(kv->value), kv->length);
@ -442,22 +333,20 @@ int32_t smlBindData(SQuery* query, bool dataFormat, SArray* tags, SArray* colsSc
} }
SRow** pRow = taosArrayReserve(pTableCxt->pData->aRowP, 1); SRow** pRow = taosArrayReserve(pTableCxt->pData->aRowP, 1);
if (NULL == pRow) { TSDB_CHECK_NULL(pRow, ret, lino, end, terrno);
ret = terrno;
goto end;
}
ret = tRowBuild(pTableCxt->pValues, pTableCxt->pSchema, pRow); ret = tRowBuild(pTableCxt->pValues, pTableCxt->pSchema, pRow);
if (TSDB_CODE_SUCCESS != ret) { TSDB_CHECK_CODE(ret, lino, end);
ret = buildInvalidOperationMsg(&pBuf, "tRowBuild error"); SRowKey key = {0};
goto end;
}
SRowKey key;
tRowGetKey(*pRow, &key); tRowGetKey(*pRow, &key);
insCheckTableDataOrder(pTableCxt, &key); insCheckTableDataOrder(pTableCxt, &key);
clearColValArraySml(pTableCxt->pValues); clearColValArraySml(pTableCxt->pValues);
} }
end: end:
if (ret != 0){
uError("%s failed at %d since %s", __func__, lino, tstrerror(ret));
ret = buildInvalidOperationMsg(&pBuf, tstrerror(ret));
}
insDestroyBoundColInfo(&bindTags); insDestroyBoundColInfo(&bindTags);
tdDestroySVCreateTbReq(pCreateTblReq); tdDestroySVCreateTbReq(pCreateTblReq);
taosMemoryFree(pCreateTblReq); taosMemoryFree(pCreateTblReq);
@ -466,29 +355,22 @@ end:
} }
int32_t smlInitHandle(SQuery** query) { int32_t smlInitHandle(SQuery** query) {
*query = NULL; int32_t lino = 0;
int32_t code = 0;
SQuery* pQuery = NULL; SQuery* pQuery = NULL;
SVnodeModifyOpStmt* stmt = NULL; SVnodeModifyOpStmt* stmt = NULL;
TSDB_CHECK_NULL(query, code, lino, end, TSDB_CODE_INVALID_PARA);
int32_t code = nodesMakeNode(QUERY_NODE_QUERY, (SNode**)&pQuery); *query = NULL;
if (code != 0) { code = nodesMakeNode(QUERY_NODE_QUERY, (SNode**)&pQuery);
uError("SML create pQuery error"); TSDB_CHECK_CODE(code, lino, end);
goto END;
}
pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE; pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
pQuery->haveResultSet = false; pQuery->haveResultSet = false;
pQuery->msgType = TDMT_VND_SUBMIT; pQuery->msgType = TDMT_VND_SUBMIT;
code = nodesMakeNode(QUERY_NODE_VNODE_MODIFY_STMT, (SNode**)&stmt); code = nodesMakeNode(QUERY_NODE_VNODE_MODIFY_STMT, (SNode**)&stmt);
if (code != 0) { TSDB_CHECK_CODE(code, lino, end);
uError("SML create SVnodeModifyOpStmt error");
goto END;
}
stmt->pTableBlockHashObj = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK); stmt->pTableBlockHashObj = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
if (stmt->pTableBlockHashObj == NULL){ TSDB_CHECK_NULL(stmt->pTableBlockHashObj, code, lino, end, terrno);
uError("SML create pTableBlockHashObj error");
code = terrno;
goto END;
}
stmt->freeHashFunc = insDestroyTableDataCxtHashMap; stmt->freeHashFunc = insDestroyTableDataCxtHashMap;
stmt->freeArrayFunc = insDestroyVgroupDataCxtList; stmt->freeArrayFunc = insDestroyVgroupDataCxtList;
@ -496,24 +378,28 @@ int32_t smlInitHandle(SQuery** query) {
*query = pQuery; *query = pQuery;
return code; return code;
END: end:
if (code != 0) {
uError("%s failed at %d since %s", __func__, lino, tstrerror(code));
}
nodesDestroyNode((SNode*)stmt); nodesDestroyNode((SNode*)stmt);
qDestroyQuery(pQuery); qDestroyQuery(pQuery);
return code; return code;
} }
int32_t smlBuildOutput(SQuery* handle, SHashObj* pVgHash) { int32_t smlBuildOutput(SQuery* handle, SHashObj* pVgHash) {
int32_t lino = 0;
int32_t code = 0;
SVnodeModifyOpStmt* pStmt = (SVnodeModifyOpStmt*)(handle)->pRoot; SVnodeModifyOpStmt* pStmt = (SVnodeModifyOpStmt*)(handle)->pRoot;
// merge according to vgId code = insMergeTableDataCxt(pStmt->pTableBlockHashObj, &pStmt->pVgDataBlocks, true);
int32_t code = insMergeTableDataCxt(pStmt->pTableBlockHashObj, &pStmt->pVgDataBlocks, true); TSDB_CHECK_CODE(code, lino, end);
if (code != TSDB_CODE_SUCCESS) {
uError("insMergeTableDataCxt failed");
return code;
}
code = insBuildVgDataBlocks(pVgHash, pStmt->pVgDataBlocks, &pStmt->pDataBlocks, false); code = insBuildVgDataBlocks(pVgHash, pStmt->pVgDataBlocks, &pStmt->pDataBlocks, false);
if (code != TSDB_CODE_SUCCESS) { TSDB_CHECK_CODE(code, lino, end);
uError("insBuildVgDataBlocks failed");
return code; end:
if (code != 0) {
uError("%s failed at %d since %s", __func__, lino, tstrerror(code));
} }
return code; return code;
} }

View File

@ -405,8 +405,6 @@ bool taosMbsToUcs4(const char *mbs, size_t mbsLength, TdUcs4 *ucs4, int32_t ucs4
size_t ucs4_input_len = mbsLength; size_t ucs4_input_len = mbsLength;
size_t outLeft = ucs4_max_len; size_t outLeft = ucs4_max_len;
if (iconv(conv, (char **)&mbs, &ucs4_input_len, (char **)&ucs4, &outLeft) == -1) { if (iconv(conv, (char **)&mbs, &ucs4_input_len, (char **)&ucs4, &outLeft) == -1) {
char buf[512] = {0};
snprintf(buf, tListLen(buf), " taosMbsToUcs4 error:%s %d %d", strerror(terrno), errno, EILSEQ);
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
taosReleaseConv(idx, conv, M2C, charsetCxt); taosReleaseConv(idx, conv, M2C, charsetCxt);
return false; return false;

View File

@ -338,6 +338,7 @@
,,y,system-test,./pytest.sh python3 test.py -f 7-tmq/tmqVnodeSplit-stb-select-duplicatedata.py -N 3 -n 3 ,,y,system-test,./pytest.sh python3 test.py -f 7-tmq/tmqVnodeSplit-stb-select-duplicatedata.py -N 3 -n 3
,,y,system-test,./pytest.sh python3 test.py -f 7-tmq/tmqVnodeSplit-stb-select-duplicatedata-false.py -N 3 -n 3 ,,y,system-test,./pytest.sh python3 test.py -f 7-tmq/tmqVnodeSplit-stb-select-duplicatedata-false.py -N 3 -n 3
,,y,system-test,./pytest.sh python3 test.py -f 7-tmq/tmqVnodeSplit-stb-select.py -N 3 -n 3 ,,y,system-test,./pytest.sh python3 test.py -f 7-tmq/tmqVnodeSplit-stb-select.py -N 3 -n 3
,,y,system-test,./pytest.sh python3 test.py -f 7-tmq/tmqVnodeSplit-ntb-select.py -N 3 -n 3
,,y,system-test,./pytest.sh python3 test.py -f 7-tmq/tmqVnodeSplit-stb-select-false.py -N 3 -n 3 ,,y,system-test,./pytest.sh python3 test.py -f 7-tmq/tmqVnodeSplit-stb-select-false.py -N 3 -n 3
,,y,system-test,./pytest.sh python3 test.py -f 7-tmq/tmqVnodeSplit-stb.py -N 3 -n 3 ,,y,system-test,./pytest.sh python3 test.py -f 7-tmq/tmqVnodeSplit-stb.py -N 3 -n 3
,,y,system-test,./pytest.sh python3 test.py -f 7-tmq/tmqVnodeSplit-stb-false.py -N 3 -n 3 ,,y,system-test,./pytest.sh python3 test.py -f 7-tmq/tmqVnodeSplit-stb-false.py -N 3 -n 3

View File

@ -0,0 +1,192 @@
import taos
import sys
import time
import socket
import os
import threading
import math
from util.log import *
from util.sql import *
from util.cases import *
from util.dnodes import *
from util.common import *
from util.cluster import *
sys.path.append("./7-tmq")
from tmqCommon import *
from util.cluster import *
sys.path.append("./6-cluster")
from clusterCommonCreate import *
from clusterCommonCheck import clusterComCheck
class TDTestCase:
def __init__(self):
self.vgroups = 1
self.ctbNum = 10
self.rowsPerTbl = 1000
def init(self, conn, logSql, replicaVar=1):
self.replicaVar = int(replicaVar)
tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor(), True)
def getDataPath(self):
selfPath = tdCom.getBuildPath()
return selfPath + '/../sim/dnode%d/data/vnode/vnode%d/wal/*';
def prepareTestEnv(self):
tdLog.printNoPrefix("======== prepare test env include database, stable, ctables, and insert data: ")
paraDict = {'dbName': 'dbt',
'dropFlag': 1,
'event': '',
'vgroups': 1,
'stbName': 'stb',
'colPrefix': 'c',
'tagPrefix': 't',
'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}],
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
'ctbPrefix': 'ctb',
'ctbStartIdx': 0,
'ctbNum': 10,
'rowsPerTbl': 1000,
'batchNum': 10,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 120,
'showMsg': 1,
'showRow': 1,
'snapshot': 0}
paraDict['vgroups'] = self.vgroups
paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl
tdCom.drop_all_db()
tmqCom.initConsumerTable()
tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], wal_retention_period=36000,vgroups=paraDict["vgroups"],replica=self.replicaVar)
tdLog.info("create stb")
tdSql.query("create table dbt.t(ts timestamp, v int)")
tdSql.query("insert into dbt.t values('2022-01-01 00:00:00.000', 0)")
tdSql.query("insert into dbt.t values('2022-01-01 00:00:02.000', 0)")
tdSql.query("insert into dbt.t values('2022-01-01 00:00:03.000', 0)")
return
def restartAndRemoveWal(self, deleteWal):
tdDnodes = cluster.dnodes
tdSql.query("select * from information_schema.ins_vnodes")
for result in tdSql.queryResult:
if result[2] == 'dbt':
tdLog.debug("dnode is %d"%(result[0]))
dnodeId = result[0]
vnodeId = result[1]
tdDnodes[dnodeId - 1].stoptaosd()
time.sleep(1)
dataPath = self.getDataPath()
dataPath = dataPath%(dnodeId,vnodeId)
tdLog.debug("dataPath:%s"%dataPath)
if deleteWal:
if os.system('rm -rf ' + dataPath) != 0:
tdLog.exit("rm error")
tdDnodes[dnodeId - 1].starttaosd()
time.sleep(1)
break
tdLog.debug("restart dnode ok")
def splitVgroups(self):
tdSql.query("select * from information_schema.ins_vnodes")
vnodeId = 0
for result in tdSql.queryResult:
if result[2] == 'dbt':
vnodeId = result[1]
tdLog.debug("vnode is %d"%(vnodeId))
break
splitSql = "split vgroup %d" %(vnodeId)
tdLog.debug("splitSql:%s"%(splitSql))
tdSql.query(splitSql)
tdLog.debug("splitSql ok")
def tmqCase1(self, deleteWal=False):
tdLog.printNoPrefix("======== test case 1: ")
paraDict = {'dbName': 'dbt',
'dropFlag': 1,
'event': '',
'vgroups': 1,
'stbName': 'stb',
'colPrefix': 'c',
'tagPrefix': 't',
'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}],
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
'ctbPrefix': 'ctb1',
'ctbStartIdx': 0,
'ctbNum': 10,
'rowsPerTbl': 1000,
'batchNum': 10,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 2,
'showMsg': 1,
'showRow': 1,
'snapshot': 0}
paraDict['vgroups'] = self.vgroups
paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl
topicNameList = ['topic1']
# expectRowsList = []
tmqCom.initConsumerTable()
tdLog.info("create topics from ntb with filter")
queryString = "select * from %s.t"%(paraDict['dbName'])
sqlString = "create topic %s as %s" %(topicNameList[0], queryString)
tdLog.info("create topic sql: %s"%sqlString)
tdSql.execute(sqlString)
# init consume info, and start tmq_sim, then check consume result
tdLog.info("insert consume info to consume processor")
consumerId = 0
expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"] * 2
topicList = topicNameList[0]
ifcheckdata = 1
ifManualCommit = 1
keyList = 'group.id:cgrp1, enable.auto.commit:true, auto.commit.interval.ms:200, auto.offset.reset:earliest'
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
tdLog.info("start consume processor")
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
tdLog.info("wait the consume result")
tmqCom.getStartConsumeNotifyFromTmqsim()
tmqCom.getStartCommitNotifyFromTmqsim()
#restart dnode & remove wal
self.restartAndRemoveWal(deleteWal)
# split vgroup
self.splitVgroups()
clusterComCheck.check_vgroups_status(vgroup_numbers=2,db_replica=self.replicaVar,db_name="dbt",count_number=240)
time.sleep(3)
for i in range(len(topicNameList)):
tdSql.query("drop topic %s"%topicNameList[i])
tdLog.printNoPrefix("======== test case 1 end ...... ")
def run(self):
self.prepareTestEnv()
self.tmqCase1(True)
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
event = threading.Event()
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())

View File

@ -1939,6 +1939,20 @@ int sml_td24559_Test() {
} }
taos_free_result(pRes); taos_free_result(pRes);
const char *sql2[] = {
"stb,t1=1 f1=283i32,f2=g\"Point(4.343 89.342)\" 1632299375000",
};
pRes = taos_query(taos, "use td24559");
taos_free_result(pRes);
pRes = taos_schemaless_insert(taos, (char **)sql2, sizeof(sql2) / sizeof(sql2[0]), TSDB_SML_LINE_PROTOCOL,
TSDB_SML_TIMESTAMP_MILLI_SECONDS);
code = taos_errno(pRes);
printf("%s result0:%s\n", __FUNCTION__, taos_errstr(pRes));
taos_free_result(pRes);
pRes = taos_query(taos, "drop database if exists td24559"); pRes = taos_query(taos, "drop database if exists td24559");
taos_free_result(pRes); taos_free_result(pRes);
@ -2325,6 +2339,17 @@ int sml_td17324_Test() {
ASSERT(code == 0); ASSERT(code == 0);
taos_free_result(pRes); taos_free_result(pRes);
const char *sql1[] = {
"st123456,t1=3i64,t2=4f64,t3=\"t3\" c1=3i64,c3=L\"pa3ssit\",c2=false,c4=4f64 1732700000394000000",
};
pRes = taos_schemaless_insert(taos, (char **)sql1, sizeof(sql1) / sizeof(sql1[0]), TSDB_SML_LINE_PROTOCOL,
TSDB_SML_TIMESTAMP_NANO_SECONDS);
code = taos_errno(pRes);
printf("%s result0:%s\n", __FUNCTION__, taos_errstr(pRes));
ASSERT(code == 0);
taos_free_result(pRes);
taos_close(taos); taos_close(taos);
return code; return code;