fix:[TD-33272]refactor code
This commit is contained in:
parent
1ce6f4a383
commit
d927e8c31a
|
@ -40,13 +40,11 @@ int32_t tqOffsetRestoreFromFile(STQ* pTq, char* name) {
|
||||||
return TSDB_CODE_INVALID_MSG;
|
return TSDB_CODE_INVALID_MSG;
|
||||||
}
|
}
|
||||||
int32_t code = TDB_CODE_SUCCESS;
|
int32_t code = TDB_CODE_SUCCESS;
|
||||||
|
int32_t lino = 0;
|
||||||
void* pMemBuf = NULL;
|
void* pMemBuf = NULL;
|
||||||
|
|
||||||
TdFilePtr pFile = taosOpenFile(name, TD_FILE_READ);
|
TdFilePtr pFile = taosOpenFile(name, TD_FILE_READ);
|
||||||
if (pFile == NULL) {
|
TSDB_CHECK_NULL(pFile, code, lino, END, TDB_CODE_SUCCESS);
|
||||||
code = TDB_CODE_SUCCESS;
|
|
||||||
goto END;
|
|
||||||
}
|
|
||||||
|
|
||||||
int64_t ret = 0;
|
int64_t ret = 0;
|
||||||
int32_t size = 0;
|
int32_t size = 0;
|
||||||
|
@ -60,24 +58,16 @@ int32_t tqOffsetRestoreFromFile(STQ* pTq, char* name) {
|
||||||
}
|
}
|
||||||
total += INT_BYTES;
|
total += INT_BYTES;
|
||||||
size = htonl(size);
|
size = htonl(size);
|
||||||
if (size <= 0) {
|
TSDB_CHECK_CONDITION(size > 0, code, lino, END, TSDB_CODE_INVALID_MSG);
|
||||||
code = TSDB_CODE_INVALID_MSG;
|
|
||||||
goto END;
|
|
||||||
}
|
|
||||||
pMemBuf = taosMemoryCalloc(1, size);
|
|
||||||
if (pMemBuf == NULL) {
|
|
||||||
code = terrno;
|
|
||||||
goto END;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (taosReadFile(pFile, pMemBuf, size) != size) {
|
pMemBuf = taosMemoryCalloc(1, size);
|
||||||
terrno = TSDB_CODE_INVALID_MSG;
|
TSDB_CHECK_NULL(pMemBuf, code, lino, END, terrno);
|
||||||
goto END;
|
TSDB_CHECK_CONDITION(taosReadFile(pFile, pMemBuf, size) == size, code, lino, END, TSDB_CODE_INVALID_MSG);
|
||||||
}
|
|
||||||
|
|
||||||
total += size;
|
total += size;
|
||||||
STqOffset offset = {0};
|
STqOffset offset = {0};
|
||||||
TQ_ERR_GO_TO_END(tqMetaDecodeOffsetInfo(&offset, pMemBuf, size));
|
code = tqMetaDecodeOffsetInfo(&offset, pMemBuf, size);
|
||||||
|
TSDB_CHECK_CODE(code, lino, END);
|
||||||
code = taosHashPut(pTq->pOffset, offset.subKey, strlen(offset.subKey), &offset, sizeof(STqOffset));
|
code = taosHashPut(pTq->pOffset, offset.subKey, strlen(offset.subKey), &offset, sizeof(STqOffset));
|
||||||
if (code != TDB_CODE_SUCCESS) {
|
if (code != TDB_CODE_SUCCESS) {
|
||||||
tDeleteSTqOffset(&offset);
|
tDeleteSTqOffset(&offset);
|
||||||
|
@ -100,6 +90,9 @@ int32_t tqOffsetRestoreFromFile(STQ* pTq, char* name) {
|
||||||
}
|
}
|
||||||
|
|
||||||
END:
|
END:
|
||||||
|
if (code != 0){
|
||||||
|
tqError("%s failed at %d since %s", __func__, lino, tstrerror(code));
|
||||||
|
}
|
||||||
taosCloseFile(&pFile);
|
taosCloseFile(&pFile);
|
||||||
taosMemoryFree(pMemBuf);
|
taosMemoryFree(pMemBuf);
|
||||||
|
|
||||||
|
|
|
@ -27,18 +27,16 @@ struct STqSnapReader {
|
||||||
};
|
};
|
||||||
|
|
||||||
int32_t tqSnapReaderOpen(STQ* pTq, int64_t sver, int64_t ever, int8_t type, STqSnapReader** ppReader) {
|
int32_t tqSnapReaderOpen(STQ* pTq, int64_t sver, int64_t ever, int8_t type, STqSnapReader** ppReader) {
|
||||||
if (pTq == NULL || ppReader == NULL) {
|
int code = TSDB_CODE_SUCCESS;
|
||||||
return TSDB_CODE_INVALID_MSG;
|
int32_t lino = 0;
|
||||||
}
|
|
||||||
int32_t code = 0;
|
|
||||||
STqSnapReader* pReader = NULL;
|
STqSnapReader* pReader = NULL;
|
||||||
|
TSDB_CHECK_NULL(pTq, code, lino, end, TSDB_CODE_INVALID_MSG);
|
||||||
|
TSDB_CHECK_NULL(ppReader, code, lino, end, TSDB_CODE_INVALID_MSG);
|
||||||
|
|
||||||
// alloc
|
// alloc
|
||||||
pReader = (STqSnapReader*)taosMemoryCalloc(1, sizeof(STqSnapReader));
|
pReader = (STqSnapReader*)taosMemoryCalloc(1, sizeof(STqSnapReader));
|
||||||
if (pReader == NULL) {
|
TSDB_CHECK_NULL(pReader, code, lino, end, terrno);
|
||||||
code = terrno;
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
pReader->pTq = pTq;
|
pReader->pTq = pTq;
|
||||||
pReader->sver = sver;
|
pReader->sver = sver;
|
||||||
pReader->ever = ever;
|
pReader->ever = ever;
|
||||||
|
@ -54,28 +52,21 @@ int32_t tqSnapReaderOpen(STQ* pTq, int64_t sver, int64_t ever, int8_t type, STqS
|
||||||
pTb = pTq->pOffsetStore;
|
pTb = pTq->pOffsetStore;
|
||||||
} else {
|
} else {
|
||||||
code = TSDB_CODE_INVALID_MSG;
|
code = TSDB_CODE_INVALID_MSG;
|
||||||
goto _err;
|
goto end;
|
||||||
}
|
}
|
||||||
code = tdbTbcOpen(pTb, &pReader->pCur, NULL);
|
code = tdbTbcOpen(pTb, &pReader->pCur, NULL);
|
||||||
if (code) {
|
TSDB_CHECK_CODE(code, lino, end);
|
||||||
taosMemoryFree(pReader);
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
|
|
||||||
code = tdbTbcMoveToFirst(pReader->pCur);
|
code = tdbTbcMoveToFirst(pReader->pCur);
|
||||||
if (code) {
|
TSDB_CHECK_CODE(code, lino, end);
|
||||||
taosMemoryFree(pReader);
|
tqInfo("vgId:%d, vnode tq snapshot reader opene success", TD_VID(pTq->pVnode));
|
||||||
goto _err;
|
*ppReader = pReader;
|
||||||
|
|
||||||
|
end:
|
||||||
|
if (code != 0){
|
||||||
|
tqError("%s failed at %d, vnode tq snapshot reader open failed since %s", __func__, lino, tstrerror(code));
|
||||||
|
taosMemoryFreeClear(pReader);
|
||||||
}
|
}
|
||||||
|
|
||||||
tqInfo("vgId:%d, vnode snapshot tq reader opened", TD_VID(pTq->pVnode));
|
|
||||||
|
|
||||||
*ppReader = pReader;
|
|
||||||
return code;
|
|
||||||
|
|
||||||
_err:
|
|
||||||
tqError("vgId:%d, vnode snapshot tq reader open failed since %s", TD_VID(pTq->pVnode), tstrerror(code));
|
|
||||||
*ppReader = NULL;
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -84,45 +75,37 @@ void tqSnapReaderClose(STqSnapReader** ppReader) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
tdbTbcClose((*ppReader)->pCur);
|
tdbTbcClose((*ppReader)->pCur);
|
||||||
taosMemoryFree(*ppReader);
|
taosMemoryFreeClear(*ppReader);
|
||||||
*ppReader = NULL;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tqSnapRead(STqSnapReader* pReader, uint8_t** ppData) {
|
int32_t tqSnapRead(STqSnapReader* pReader, uint8_t** ppData) {
|
||||||
if (pReader == NULL || ppData == NULL) {
|
int code = TSDB_CODE_SUCCESS;
|
||||||
return TSDB_CODE_INVALID_MSG;
|
int32_t lino = 0;
|
||||||
}
|
void* pKey = NULL;
|
||||||
int32_t code = 0;
|
void* pVal = NULL;
|
||||||
void* pKey = NULL;
|
int32_t kLen = 0;
|
||||||
void* pVal = NULL;
|
int32_t vLen = 0;
|
||||||
int32_t kLen = 0;
|
TSDB_CHECK_NULL(pReader, code, lino, end, TSDB_CODE_INVALID_MSG);
|
||||||
int32_t vLen = 0;
|
TSDB_CHECK_NULL(ppData, code, lino, end, TSDB_CODE_INVALID_MSG);
|
||||||
|
|
||||||
if (tdbTbcNext(pReader->pCur, &pKey, &kLen, &pVal, &vLen)) {
|
code = tdbTbcNext(pReader->pCur, &pKey, &kLen, &pVal, &vLen);
|
||||||
goto _exit;
|
TSDB_CHECK_CODE(code, lino, end);
|
||||||
}
|
|
||||||
|
|
||||||
*ppData = taosMemoryMalloc(sizeof(SSnapDataHdr) + vLen);
|
*ppData = taosMemoryMalloc(sizeof(SSnapDataHdr) + vLen);
|
||||||
if (*ppData == NULL) {
|
TSDB_CHECK_NULL(*ppData, code, lino, end, terrno);
|
||||||
code = TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
|
|
||||||
SSnapDataHdr* pHdr = (SSnapDataHdr*)(*ppData);
|
SSnapDataHdr* pHdr = (SSnapDataHdr*)(*ppData);
|
||||||
pHdr->type = pReader->type;
|
pHdr->type = pReader->type;
|
||||||
pHdr->size = vLen;
|
pHdr->size = vLen;
|
||||||
(void)memcpy(pHdr->data, pVal, vLen);
|
(void)memcpy(pHdr->data, pVal, vLen);
|
||||||
|
tqInfo("vgId:%d, vnode tq snapshot read data, vLen:%d", TD_VID(pReader->pTq->pVnode), vLen);
|
||||||
|
|
||||||
_exit:
|
end:
|
||||||
|
if (code != 0) {
|
||||||
|
tqError("%s failed at %d, vnode tq snapshot read data failed since %s", __func__, lino, tstrerror(code));
|
||||||
|
}
|
||||||
tdbFree(pKey);
|
tdbFree(pKey);
|
||||||
tdbFree(pVal);
|
tdbFree(pVal);
|
||||||
tqInfo("vgId:%d, vnode snapshot tq read data, vLen:%d", TD_VID(pReader->pTq->pVnode), vLen);
|
|
||||||
return code;
|
|
||||||
|
|
||||||
_err:
|
|
||||||
tdbFree(pKey);
|
|
||||||
tdbFree(pVal);
|
|
||||||
tqError("vgId:%d, vnode snapshot tq read data failed since %s", TD_VID(pReader->pTq->pVnode), tstrerror(code));
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -135,135 +118,148 @@ struct STqSnapWriter {
|
||||||
};
|
};
|
||||||
|
|
||||||
int32_t tqSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, STqSnapWriter** ppWriter) {
|
int32_t tqSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, STqSnapWriter** ppWriter) {
|
||||||
if (pTq == NULL || ppWriter == NULL) {
|
int code = TSDB_CODE_SUCCESS;
|
||||||
return TSDB_CODE_INVALID_MSG;
|
int32_t lino = 0;
|
||||||
}
|
|
||||||
int32_t code = 0;
|
|
||||||
STqSnapWriter* pWriter = NULL;
|
STqSnapWriter* pWriter = NULL;
|
||||||
|
|
||||||
|
TSDB_CHECK_NULL(pTq, code, lino, end, TSDB_CODE_INVALID_MSG);
|
||||||
|
TSDB_CHECK_NULL(ppWriter, code, lino, end, TSDB_CODE_INVALID_MSG);
|
||||||
|
|
||||||
// alloc
|
// alloc
|
||||||
pWriter = (STqSnapWriter*)taosMemoryCalloc(1, sizeof(*pWriter));
|
pWriter = (STqSnapWriter*)taosMemoryCalloc(1, sizeof(*pWriter));
|
||||||
if (pWriter == NULL) {
|
TSDB_CHECK_NULL(pWriter, code, lino, end, terrno);
|
||||||
code = TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
|
|
||||||
;
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
pWriter->pTq = pTq;
|
pWriter->pTq = pTq;
|
||||||
pWriter->sver = sver;
|
pWriter->sver = sver;
|
||||||
pWriter->ever = ever;
|
pWriter->ever = ever;
|
||||||
|
|
||||||
code = tdbBegin(pTq->pMetaDB, &pWriter->txn, tdbDefaultMalloc, tdbDefaultFree, NULL, 0);
|
code = tdbBegin(pTq->pMetaDB, &pWriter->txn, tdbDefaultMalloc, tdbDefaultFree, NULL, 0);
|
||||||
if (code < 0) {
|
TSDB_CHECK_CODE(code, lino, end);
|
||||||
taosMemoryFree(pWriter);
|
tqInfo("vgId:%d, tq snapshot writer opene success", TD_VID(pTq->pVnode));
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
|
|
||||||
*ppWriter = pWriter;
|
*ppWriter = pWriter;
|
||||||
return code;
|
|
||||||
|
|
||||||
_err:
|
end:
|
||||||
tqError("vgId:%d, tq snapshot writer open failed since %s", TD_VID(pTq->pVnode), tstrerror(code));
|
if (code != 0){
|
||||||
*ppWriter = NULL;
|
tqError("%s failed at %d tq snapshot writer open failed since %s", __func__, lino, tstrerror(code));
|
||||||
|
taosMemoryFreeClear(pWriter);
|
||||||
|
}
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tqSnapWriterClose(STqSnapWriter** ppWriter, int8_t rollback) {
|
int32_t tqSnapWriterClose(STqSnapWriter** ppWriter, int8_t rollback) {
|
||||||
if (ppWriter == NULL || *ppWriter == NULL) {
|
int code = TSDB_CODE_SUCCESS;
|
||||||
return TSDB_CODE_INVALID_MSG;
|
int32_t lino = 0;
|
||||||
}
|
STqSnapWriter* pWriter = NULL;
|
||||||
int32_t code = 0;
|
|
||||||
STqSnapWriter* pWriter = *ppWriter;
|
TSDB_CHECK_NULL(ppWriter, code, lino, end, TSDB_CODE_INVALID_MSG);
|
||||||
STQ* pTq = pWriter->pTq;
|
TSDB_CHECK_NULL(*ppWriter, code, lino, end, TSDB_CODE_INVALID_MSG);
|
||||||
|
pWriter = *ppWriter;
|
||||||
|
|
||||||
if (rollback) {
|
if (rollback) {
|
||||||
tdbAbort(pWriter->pTq->pMetaDB, pWriter->txn);
|
tdbAbort(pWriter->pTq->pMetaDB, pWriter->txn);
|
||||||
} else {
|
} else {
|
||||||
code = tdbCommit(pWriter->pTq->pMetaDB, pWriter->txn);
|
code = tdbCommit(pWriter->pTq->pMetaDB, pWriter->txn);
|
||||||
if (code) goto _err;
|
TSDB_CHECK_CODE(code, lino, end);
|
||||||
|
|
||||||
code = tdbPostCommit(pWriter->pTq->pMetaDB, pWriter->txn);
|
code = tdbPostCommit(pWriter->pTq->pMetaDB, pWriter->txn);
|
||||||
if (code) goto _err;
|
TSDB_CHECK_CODE(code, lino, end);
|
||||||
}
|
}
|
||||||
|
tqInfo("vgId:%d, tq snapshot writer close success", TD_VID(pWriter->pTq->pVnode));
|
||||||
|
taosMemoryFreeClear(*ppWriter);
|
||||||
|
|
||||||
taosMemoryFree(pWriter);
|
end:
|
||||||
*ppWriter = NULL;
|
if (code != 0){
|
||||||
|
tqError("%s failed at %d, tq snapshot writer close failed since %s", __func__, lino, tstrerror(code));
|
||||||
return code;
|
}
|
||||||
|
|
||||||
_err:
|
|
||||||
tqError("vgId:%d, tq snapshot writer close failed since %s", TD_VID(pTq->pVnode), tstrerror(code));
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tqSnapHandleWrite(STqSnapWriter* pWriter, uint8_t* pData, uint32_t nData) {
|
static int32_t tqWriteCheck(STqSnapWriter* pWriter, uint8_t* pData, uint32_t nData){
|
||||||
if (pWriter == NULL || pData == NULL || nData < sizeof(SSnapDataHdr)) {
|
int code = TSDB_CODE_SUCCESS;
|
||||||
return TSDB_CODE_INVALID_MSG;
|
int32_t lino = 0;
|
||||||
|
TSDB_CHECK_NULL(pWriter, code, lino, end, TSDB_CODE_INVALID_MSG);
|
||||||
|
TSDB_CHECK_NULL(pData, code, lino, end, TSDB_CODE_INVALID_MSG);
|
||||||
|
TSDB_CHECK_CONDITION(nData >= sizeof(SSnapDataHdr), code, lino, end, TSDB_CODE_INVALID_MSG);
|
||||||
|
end:
|
||||||
|
if (code != 0){
|
||||||
|
tqError("%s failed at %d failed since %s", __func__, lino, tstrerror(code));
|
||||||
}
|
}
|
||||||
int32_t code = 0;
|
return code;
|
||||||
STQ* pTq = pWriter->pTq;
|
}
|
||||||
|
int32_t tqSnapHandleWrite(STqSnapWriter* pWriter, uint8_t* pData, uint32_t nData) {
|
||||||
|
int code = TSDB_CODE_SUCCESS;
|
||||||
|
int32_t lino = 0;
|
||||||
SDecoder decoder = {0};
|
SDecoder decoder = {0};
|
||||||
SDecoder* pDecoder = &decoder;
|
SDecoder* pDecoder = &decoder;
|
||||||
STqHandle handle = {0};
|
STqHandle handle = {0};
|
||||||
|
code = tqWriteCheck(pWriter, pData, nData);
|
||||||
|
TSDB_CHECK_CODE(code, lino, end);
|
||||||
|
|
||||||
|
STQ* pTq = pWriter->pTq;
|
||||||
tDecoderInit(pDecoder, pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr));
|
tDecoderInit(pDecoder, pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr));
|
||||||
code = tDecodeSTqHandle(pDecoder, &handle);
|
code = tDecodeSTqHandle(pDecoder, &handle);
|
||||||
if (code) goto end;
|
TSDB_CHECK_CODE(code, lino, end);
|
||||||
|
|
||||||
taosWLockLatch(&pTq->lock);
|
taosWLockLatch(&pTq->lock);
|
||||||
code = tqMetaSaveInfo(pTq, pTq->pExecStore, handle.subKey, strlen(handle.subKey), pData + sizeof(SSnapDataHdr),
|
code = tqMetaSaveInfo(pTq, pTq->pExecStore, handle.subKey, strlen(handle.subKey), pData + sizeof(SSnapDataHdr),
|
||||||
nData - sizeof(SSnapDataHdr));
|
nData - sizeof(SSnapDataHdr));
|
||||||
taosWUnLockLatch(&pTq->lock);
|
taosWUnLockLatch(&pTq->lock);
|
||||||
|
TSDB_CHECK_CODE(code, lino, end);
|
||||||
|
tqInfo("vgId:%d, vnode tq snapshot write success", TD_VID(pTq->pVnode));
|
||||||
|
|
||||||
end:
|
end:
|
||||||
tDecoderClear(pDecoder);
|
tDecoderClear(pDecoder);
|
||||||
tqDestroyTqHandle(&handle);
|
tqDestroyTqHandle(&handle);
|
||||||
tqInfo("vgId:%d, vnode snapshot tq write result:%d", TD_VID(pTq->pVnode), code);
|
if (code != 0){
|
||||||
|
tqError("%s failed at %d, vnode tq snapshot write failed since %s", __func__, lino, tstrerror(code));
|
||||||
|
}
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tqSnapCheckInfoWrite(STqSnapWriter* pWriter, uint8_t* pData, uint32_t nData) {
|
int32_t tqSnapCheckInfoWrite(STqSnapWriter* pWriter, uint8_t* pData, uint32_t nData) {
|
||||||
if (pWriter == NULL || pData == NULL || nData < sizeof(SSnapDataHdr)) {
|
int code = TSDB_CODE_SUCCESS;
|
||||||
return TSDB_CODE_INVALID_MSG;
|
int32_t lino = 0;
|
||||||
}
|
|
||||||
int32_t code = 0;
|
code = tqWriteCheck(pWriter, pData, nData);
|
||||||
|
TSDB_CHECK_CODE(code, lino, end);
|
||||||
|
|
||||||
STQ* pTq = pWriter->pTq;
|
STQ* pTq = pWriter->pTq;
|
||||||
STqCheckInfo info = {0};
|
STqCheckInfo info = {0};
|
||||||
code = tqMetaDecodeCheckInfo(&info, pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr));
|
code = tqMetaDecodeCheckInfo(&info, pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr));
|
||||||
if (code != 0) {
|
TSDB_CHECK_CODE(code, lino, end);
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
|
|
||||||
code = tqMetaSaveInfo(pTq, pTq->pCheckStore, &info.topic, strlen(info.topic), pData + sizeof(SSnapDataHdr),
|
code = tqMetaSaveInfo(pTq, pTq->pCheckStore, &info.topic, strlen(info.topic), pData + sizeof(SSnapDataHdr),
|
||||||
nData - sizeof(SSnapDataHdr));
|
nData - sizeof(SSnapDataHdr));
|
||||||
tDeleteSTqCheckInfo(&info);
|
tDeleteSTqCheckInfo(&info);
|
||||||
if (code) goto _err;
|
TSDB_CHECK_CODE(code, lino, end);
|
||||||
|
tqInfo("vgId:%d, vnode tq check info write success", TD_VID(pTq->pVnode));
|
||||||
|
|
||||||
return code;
|
end:
|
||||||
|
if (code != 0){
|
||||||
_err:
|
tqError("%s failed at %d, vnode tq check info write failed since %s", __func__, lino, tstrerror(code));
|
||||||
tqError("vgId:%d, vnode check info tq write failed since %s", TD_VID(pTq->pVnode), tstrerror(code));
|
}
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tqSnapOffsetWrite(STqSnapWriter* pWriter, uint8_t* pData, uint32_t nData) {
|
int32_t tqSnapOffsetWrite(STqSnapWriter* pWriter, uint8_t* pData, uint32_t nData) {
|
||||||
if (pWriter == NULL || pData == NULL || nData < sizeof(SSnapDataHdr)) {
|
int code = TSDB_CODE_SUCCESS;
|
||||||
return TSDB_CODE_INVALID_MSG;
|
int32_t lino = 0;
|
||||||
}
|
code = tqWriteCheck(pWriter, pData, nData);
|
||||||
int32_t code = 0;
|
TSDB_CHECK_CODE(code, lino, end);
|
||||||
STQ* pTq = pWriter->pTq;
|
|
||||||
|
|
||||||
|
STQ* pTq = pWriter->pTq;
|
||||||
STqOffset info = {0};
|
STqOffset info = {0};
|
||||||
code = tqMetaDecodeOffsetInfo(&info, pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr));
|
code = tqMetaDecodeOffsetInfo(&info, pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr));
|
||||||
if (code != 0) {
|
TSDB_CHECK_CODE(code, lino, end);
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
|
|
||||||
code = tqMetaSaveInfo(pTq, pTq->pOffsetStore, info.subKey, strlen(info.subKey), pData + sizeof(SSnapDataHdr),
|
code = tqMetaSaveInfo(pTq, pTq->pOffsetStore, info.subKey, strlen(info.subKey), pData + sizeof(SSnapDataHdr),
|
||||||
nData - sizeof(SSnapDataHdr));
|
nData - sizeof(SSnapDataHdr));
|
||||||
tDeleteSTqOffset(&info);
|
tDeleteSTqOffset(&info);
|
||||||
if (code) goto _err;
|
TSDB_CHECK_CODE(code, lino, end);
|
||||||
|
tqInfo("vgId:%d, vnode tq offset write success", TD_VID(pTq->pVnode));
|
||||||
|
|
||||||
return code;
|
end:
|
||||||
|
if (code != 0){
|
||||||
_err:
|
tqError("%s failed at %d, vnode tq offset write failed since %s", __func__, lino, tstrerror(code));
|
||||||
tqError("vgId:%d, vnode check info tq write failed since %s", TD_VID(pTq->pVnode), tstrerror(code));
|
}
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,40 +20,32 @@
|
||||||
|
|
||||||
int32_t qCreateSName(SName* pName, const char* pTableName, int32_t acctId, char* dbName, char* msgBuf,
|
int32_t qCreateSName(SName* pName, const char* pTableName, int32_t acctId, char* dbName, char* msgBuf,
|
||||||
int32_t msgBufLen) {
|
int32_t msgBufLen) {
|
||||||
SMsgBuf msg = {.buf = msgBuf, .len = msgBufLen};
|
SMsgBuf msg = {.buf = msgBuf, .len = msgBufLen};
|
||||||
SToken sToken;
|
SToken sToken = {0};
|
||||||
int32_t code = 0;
|
int code = TSDB_CODE_SUCCESS;
|
||||||
char* tbName = NULL;
|
int32_t lino = 0;
|
||||||
|
|
||||||
NEXT_TOKEN(pTableName, sToken);
|
NEXT_TOKEN(pTableName, sToken);
|
||||||
|
TSDB_CHECK_CONDITION(sToken.n != 0, code, lino, end, TSDB_CODE_TSC_INVALID_OPERATION);
|
||||||
if (sToken.n == 0) {
|
|
||||||
return buildInvalidOperationMsg(&msg, "empty table name");
|
|
||||||
}
|
|
||||||
|
|
||||||
code = insCreateSName(pName, &sToken, acctId, dbName, &msg);
|
code = insCreateSName(pName, &sToken, acctId, dbName, &msg);
|
||||||
if (code) {
|
TSDB_CHECK_CODE(code, lino, end);
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
NEXT_TOKEN(pTableName, sToken);
|
NEXT_TOKEN(pTableName, sToken);
|
||||||
|
TSDB_CHECK_CONDITION(sToken.n <= 0, code, lino, end, TSDB_CODE_TSC_INVALID_OPERATION);
|
||||||
|
|
||||||
if (sToken.n > 0) {
|
end:
|
||||||
return buildInvalidOperationMsg(&msg, "table name format is wrong");
|
if (code != 0){
|
||||||
|
uError("%s failed at %d since %s", __func__, lino, tstrerror(code));
|
||||||
}
|
}
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t smlBoundColumnData(SArray* cols, SBoundColInfo* pBoundInfo, SSchema* pSchema, bool isTag) {
|
static int32_t smlBoundColumnData(SArray* cols, SBoundColInfo* pBoundInfo, SSchema* pSchema, bool isTag) {
|
||||||
|
int code = TSDB_CODE_SUCCESS;
|
||||||
|
int32_t lino = 0;
|
||||||
bool* pUseCols = taosMemoryCalloc(pBoundInfo->numOfCols, sizeof(bool));
|
bool* pUseCols = taosMemoryCalloc(pBoundInfo->numOfCols, sizeof(bool));
|
||||||
if (NULL == pUseCols) {
|
TSDB_CHECK_NULL(pUseCols, code, lino, end, terrno);
|
||||||
return terrno;
|
|
||||||
}
|
|
||||||
|
|
||||||
pBoundInfo->numOfBound = 0;
|
pBoundInfo->numOfBound = 0;
|
||||||
int16_t lastColIdx = -1; // last column found
|
int16_t lastColIdx = -1; // last column found
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
|
||||||
|
|
||||||
for (int i = 0; i < taosArrayGetSize(cols); ++i) {
|
for (int i = 0; i < taosArrayGetSize(cols); ++i) {
|
||||||
SSmlKv* kv = taosArrayGet(cols, i);
|
SSmlKv* kv = taosArrayGet(cols, i);
|
||||||
|
@ -65,16 +57,9 @@ static int32_t smlBoundColumnData(SArray* cols, SBoundColInfo* pBoundInfo, SSche
|
||||||
index = insFindCol(&sToken, 0, t, pSchema);
|
index = insFindCol(&sToken, 0, t, pSchema);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (index < 0) {
|
TSDB_CHECK_CONDITION(index >= 0, code, lino, end, TSDB_CODE_SML_INVALID_DATA);
|
||||||
uError("smlBoundColumnData. index:%d", index);
|
TSDB_CHECK_CONDITION(!pUseCols[index], code, lino, end, TSDB_CODE_SML_INVALID_DATA);
|
||||||
code = TSDB_CODE_SML_INVALID_DATA;
|
|
||||||
goto end;
|
|
||||||
}
|
|
||||||
if (pUseCols[index]) {
|
|
||||||
uError("smlBoundColumnData. already set. index:%d", index);
|
|
||||||
code = TSDB_CODE_SML_INVALID_DATA;
|
|
||||||
goto end;
|
|
||||||
}
|
|
||||||
lastColIdx = index;
|
lastColIdx = index;
|
||||||
pUseCols[index] = true;
|
pUseCols[index] = true;
|
||||||
pBoundInfo->pColIndex[pBoundInfo->numOfBound] = index;
|
pBoundInfo->pColIndex[pBoundInfo->numOfBound] = index;
|
||||||
|
@ -82,11 +67,30 @@ static int32_t smlBoundColumnData(SArray* cols, SBoundColInfo* pBoundInfo, SSche
|
||||||
}
|
}
|
||||||
|
|
||||||
end:
|
end:
|
||||||
|
if (code != 0){
|
||||||
|
uError("%s failed at %d since %s", __func__, lino, tstrerror(code));
|
||||||
|
}
|
||||||
taosMemoryFree(pUseCols);
|
taosMemoryFree(pUseCols);
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t smlMbsToUcs4(const char* mbs, size_t mbsLen, void** result, int32_t* resultLen, int32_t maxLen, void* charsetCxt){
|
||||||
|
int code = TSDB_CODE_SUCCESS;
|
||||||
|
void* pUcs4 = NULL;
|
||||||
|
int32_t lino = 0;
|
||||||
|
pUcs4 = taosMemoryCalloc(1, maxLen);
|
||||||
|
TSDB_CHECK_NULL(pUcs4, code, lino, end, terrno);
|
||||||
|
TSDB_CHECK_CONDITION(taosMbsToUcs4(mbs, mbsLen, (TdUcs4*)pUcs4, maxLen, resultLen, charsetCxt), code, lino, end, terrno);
|
||||||
|
*result = pUcs4;
|
||||||
|
pUcs4 = NULL;
|
||||||
|
|
||||||
|
end:
|
||||||
|
if (code != 0){
|
||||||
|
uError("%s failed at %d since %s", __func__, lino, tstrerror(code));
|
||||||
|
}
|
||||||
|
taosMemoryFree(pUcs4);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
/**
|
/**
|
||||||
* @brief No json tag for schemaless
|
* @brief No json tag for schemaless
|
||||||
*
|
*
|
||||||
|
@ -99,75 +103,39 @@ end:
|
||||||
*/
|
*/
|
||||||
static int32_t smlBuildTagRow(SArray* cols, SBoundColInfo* tags, SSchema* pSchema, STag** ppTag, SArray** tagName,
|
static int32_t smlBuildTagRow(SArray* cols, SBoundColInfo* tags, SSchema* pSchema, STag** ppTag, SArray** tagName,
|
||||||
SMsgBuf* msg, void* charsetCxt) {
|
SMsgBuf* msg, void* charsetCxt) {
|
||||||
SArray* pTagArray = taosArrayInit(tags->numOfBound, sizeof(STagVal));
|
int code = TSDB_CODE_SUCCESS;
|
||||||
if (!pTagArray) {
|
int32_t lino = 0;
|
||||||
return terrno;
|
SArray* pTagArray = taosArrayInit(tags->numOfBound, sizeof(STagVal));
|
||||||
}
|
TSDB_CHECK_NULL(pTagArray, code, lino, end, terrno);
|
||||||
*tagName = taosArrayInit(8, TSDB_COL_NAME_LEN);
|
*tagName = taosArrayInit(8, TSDB_COL_NAME_LEN);
|
||||||
if (!*tagName) {
|
TSDB_CHECK_NULL(*tagName, code, lino, end, terrno);
|
||||||
return terrno;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
|
||||||
for (int i = 0; i < tags->numOfBound; ++i) {
|
for (int i = 0; i < tags->numOfBound; ++i) {
|
||||||
SSchema* pTagSchema = &pSchema[tags->pColIndex[i]];
|
SSchema* pTagSchema = &pSchema[tags->pColIndex[i]];
|
||||||
SSmlKv* kv = taosArrayGet(cols, i);
|
SSmlKv* kv = taosArrayGet(cols, i);
|
||||||
if (kv == NULL){
|
TSDB_CHECK_NULL(kv, code, lino, end, terrno);
|
||||||
code = terrno;
|
bool cond = (kv->keyLen == strlen(pTagSchema->name) && memcmp(kv->key, pTagSchema->name, kv->keyLen) == 0 && kv->type == pTagSchema->type);
|
||||||
uError("SML smlBuildTagRow error kv is null");
|
TSDB_CHECK_CONDITION(cond, code, lino, end, TSDB_CODE_SML_INVALID_DATA);
|
||||||
goto end;
|
TSDB_CHECK_NULL(taosArrayPush(*tagName, pTagSchema->name), code, lino, end, terrno);
|
||||||
}
|
|
||||||
if (kv->keyLen != strlen(pTagSchema->name) || memcmp(kv->key, pTagSchema->name, kv->keyLen) != 0 ||
|
|
||||||
kv->type != pTagSchema->type) {
|
|
||||||
code = TSDB_CODE_SML_INVALID_DATA;
|
|
||||||
uError("SML smlBuildTagRow error col not same %s", pTagSchema->name);
|
|
||||||
goto end;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (taosArrayPush(*tagName, pTagSchema->name) == NULL){
|
|
||||||
code = terrno;
|
|
||||||
uError("SML smlBuildTagRow error push tag name");
|
|
||||||
goto end;
|
|
||||||
}
|
|
||||||
STagVal val = {.cid = pTagSchema->colId, .type = pTagSchema->type};
|
STagVal val = {.cid = pTagSchema->colId, .type = pTagSchema->type};
|
||||||
// strcpy(val.colName, pTagSchema->name);
|
|
||||||
if (pTagSchema->type == TSDB_DATA_TYPE_BINARY || pTagSchema->type == TSDB_DATA_TYPE_VARBINARY ||
|
if (pTagSchema->type == TSDB_DATA_TYPE_BINARY || pTagSchema->type == TSDB_DATA_TYPE_VARBINARY ||
|
||||||
pTagSchema->type == TSDB_DATA_TYPE_GEOMETRY) {
|
pTagSchema->type == TSDB_DATA_TYPE_GEOMETRY) {
|
||||||
val.pData = (uint8_t*)kv->value;
|
val.pData = (uint8_t*)kv->value;
|
||||||
val.nData = kv->length;
|
val.nData = kv->length;
|
||||||
} else if (pTagSchema->type == TSDB_DATA_TYPE_NCHAR) {
|
} else if (pTagSchema->type == TSDB_DATA_TYPE_NCHAR) {
|
||||||
int32_t output = 0;
|
code = smlMbsToUcs4(kv->value, kv->length, (void**)&val.pData, &val.nData, kv->length * TSDB_NCHAR_SIZE, charsetCxt);
|
||||||
void* p = taosMemoryCalloc(1, kv->length * TSDB_NCHAR_SIZE);
|
TSDB_CHECK_CODE(code, lino, end);
|
||||||
if (p == NULL) {
|
|
||||||
code = terrno;
|
|
||||||
goto end;
|
|
||||||
}
|
|
||||||
if (!taosMbsToUcs4(kv->value, kv->length, (TdUcs4*)(p), kv->length * TSDB_NCHAR_SIZE, &output, charsetCxt)) {
|
|
||||||
if (terrno == TAOS_SYSTEM_ERROR(E2BIG)) {
|
|
||||||
taosMemoryFree(p);
|
|
||||||
code = generateSyntaxErrMsg(msg, TSDB_CODE_PAR_VALUE_TOO_LONG, pTagSchema->name);
|
|
||||||
goto end;
|
|
||||||
}
|
|
||||||
char buf[512] = {0};
|
|
||||||
(void)snprintf(buf, tListLen(buf), " taosMbsToUcs4 error:%s", strerror(terrno));
|
|
||||||
taosMemoryFree(p);
|
|
||||||
code = buildSyntaxErrMsg(msg, buf, kv->value);
|
|
||||||
goto end;
|
|
||||||
}
|
|
||||||
val.pData = p;
|
|
||||||
val.nData = output;
|
|
||||||
} else {
|
} else {
|
||||||
(void)memcpy(&val.i64, &(kv->value), kv->length);
|
(void)memcpy(&val.i64, &(kv->value), kv->length);
|
||||||
}
|
}
|
||||||
if (taosArrayPush(pTagArray, &val) == NULL){
|
TSDB_CHECK_NULL(taosArrayPush(pTagArray, &val), code, lino, end, terrno);
|
||||||
code = terrno;
|
|
||||||
uError("SML smlBuildTagRow error push tag array");
|
|
||||||
goto end;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
code = tTagNew(pTagArray, 1, false, ppTag);
|
code = tTagNew(pTagArray, 1, false, ppTag);
|
||||||
|
|
||||||
end:
|
end:
|
||||||
|
if (code != 0){
|
||||||
|
uError("%s failed at %d since %s", __func__, lino, tstrerror(code));
|
||||||
|
}
|
||||||
for (int i = 0; i < taosArrayGetSize(pTagArray); ++i) {
|
for (int i = 0; i < taosArrayGetSize(pTagArray); ++i) {
|
||||||
STagVal* p = (STagVal*)taosArrayGet(pTagArray, i);
|
STagVal* p = (STagVal*)taosArrayGet(pTagArray, i);
|
||||||
if (p->type == TSDB_DATA_TYPE_NCHAR) {
|
if (p->type == TSDB_DATA_TYPE_NCHAR) {
|
||||||
|
@ -179,18 +147,20 @@ end:
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t smlInitTableDataCtx(SQuery* query, STableMeta* pTableMeta, STableDataCxt** cxt) {
|
int32_t smlInitTableDataCtx(SQuery* query, STableMeta* pTableMeta, STableDataCxt** cxt) {
|
||||||
|
int ret = TSDB_CODE_SUCCESS;
|
||||||
|
int32_t lino = 0;
|
||||||
SVCreateTbReq* pCreateTbReq = NULL;
|
SVCreateTbReq* pCreateTbReq = NULL;
|
||||||
int ret = insGetTableDataCxt(((SVnodeModifyOpStmt*)(query->pRoot))->pTableBlockHashObj, &pTableMeta->uid,
|
ret = insGetTableDataCxt(((SVnodeModifyOpStmt*)(query->pRoot))->pTableBlockHashObj, &pTableMeta->uid,
|
||||||
sizeof(pTableMeta->uid), pTableMeta, &pCreateTbReq, cxt, false, false);
|
sizeof(pTableMeta->uid), pTableMeta, &pCreateTbReq, cxt, false, false);
|
||||||
if (ret != TSDB_CODE_SUCCESS) {
|
TSDB_CHECK_CODE(ret, lino, end);
|
||||||
return ret;
|
|
||||||
}
|
|
||||||
|
|
||||||
ret = initTableColSubmitData(*cxt);
|
ret = initTableColSubmitData(*cxt);
|
||||||
if (ret != TSDB_CODE_SUCCESS) {
|
TSDB_CHECK_CODE(ret, lino, end);
|
||||||
return ret;
|
|
||||||
|
end:
|
||||||
|
if (ret != 0){
|
||||||
|
uError("%s failed at %d since %s", __func__, lino, tstrerror(ret));
|
||||||
}
|
}
|
||||||
return TSDB_CODE_SUCCESS;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
void clearColValArraySml(SArray* pCols) {
|
void clearColValArraySml(SArray* pCols) {
|
||||||
|
@ -207,78 +177,51 @@ void clearColValArraySml(SArray* pCols) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t smlBuildRow(STableDataCxt* pTableCxt) {
|
int32_t smlBuildRow(STableDataCxt* pTableCxt) {
|
||||||
SRow** pRow = taosArrayReserve(pTableCxt->pData->aRowP, 1);
|
int ret = TSDB_CODE_SUCCESS;
|
||||||
if (pRow == NULL){
|
int32_t lino = 0;
|
||||||
return terrno;
|
SRow** pRow = taosArrayReserve(pTableCxt->pData->aRowP, 1);
|
||||||
}
|
TSDB_CHECK_NULL(pRow, ret, lino, end, terrno);
|
||||||
int ret = tRowBuild(pTableCxt->pValues, pTableCxt->pSchema, pRow);
|
ret = tRowBuild(pTableCxt->pValues, pTableCxt->pSchema, pRow);
|
||||||
if (TSDB_CODE_SUCCESS != ret) {
|
TSDB_CHECK_CODE(ret, lino, end);
|
||||||
return ret;
|
|
||||||
}
|
|
||||||
SRowKey key;
|
SRowKey key;
|
||||||
tRowGetKey(*pRow, &key);
|
tRowGetKey(*pRow, &key);
|
||||||
insCheckTableDataOrder(pTableCxt, &key);
|
insCheckTableDataOrder(pTableCxt, &key);
|
||||||
return TSDB_CODE_SUCCESS;
|
end:
|
||||||
|
if (ret != 0){
|
||||||
|
uError("%s failed at %d since %s", __func__, lino, tstrerror(ret));
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t smlBuildCol(STableDataCxt* pTableCxt, SSchema* schema, void* data, int32_t index, void* charsetCxt) {
|
int32_t smlBuildCol(STableDataCxt* pTableCxt, SSchema* schema, void* data, int32_t index, void* charsetCxt) {
|
||||||
int ret = TSDB_CODE_SUCCESS;
|
int ret = TSDB_CODE_SUCCESS;
|
||||||
|
int32_t lino = 0;
|
||||||
SSchema* pColSchema = schema + index;
|
SSchema* pColSchema = schema + index;
|
||||||
SColVal* pVal = taosArrayGet(pTableCxt->pValues, index);
|
SColVal* pVal = taosArrayGet(pTableCxt->pValues, index);
|
||||||
if (pVal == NULL) {
|
TSDB_CHECK_NULL(pVal, ret, lino, end, TSDB_CODE_SUCCESS);
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
SSmlKv* kv = (SSmlKv*)data;
|
SSmlKv* kv = (SSmlKv*)data;
|
||||||
if (kv->keyLen != strlen(pColSchema->name) || memcmp(kv->key, pColSchema->name, kv->keyLen) != 0 ||
|
if (kv->keyLen != strlen(pColSchema->name) || memcmp(kv->key, pColSchema->name, kv->keyLen) != 0 ||
|
||||||
kv->type != pColSchema->type) {
|
kv->type != pColSchema->type) {
|
||||||
ret = TSDB_CODE_SML_INVALID_DATA;
|
ret = TSDB_CODE_SML_INVALID_DATA;
|
||||||
char* tmp = taosMemoryCalloc(kv->keyLen + 1, 1);
|
char* tmp = taosMemoryCalloc(kv->keyLen + 1, 1);
|
||||||
if (tmp) {
|
TSDB_CHECK_NULL(tmp, ret, lino, end, terrno);
|
||||||
(void)memcpy(tmp, kv->key, kv->keyLen);
|
(void)memcpy(tmp, kv->key, kv->keyLen);
|
||||||
uInfo("SML data(name:%s type:%s) is not same like the db data(name:%s type:%s)", tmp, tDataTypes[kv->type].name,
|
uInfo("SML data(name:%s type:%s) is not same like the db data(name:%s type:%s)", tmp, tDataTypes[kv->type].name,
|
||||||
pColSchema->name, tDataTypes[pColSchema->type].name);
|
pColSchema->name, tDataTypes[pColSchema->type].name);
|
||||||
taosMemoryFree(tmp);
|
taosMemoryFree(tmp);
|
||||||
} else {
|
|
||||||
uError("SML smlBuildCol out of memory");
|
|
||||||
ret = terrno;
|
|
||||||
}
|
|
||||||
goto end;
|
goto end;
|
||||||
}
|
}
|
||||||
if (kv->type == TSDB_DATA_TYPE_NCHAR) {
|
if (kv->type == TSDB_DATA_TYPE_NCHAR) {
|
||||||
int32_t len = 0;
|
ret = smlMbsToUcs4(kv->value, kv->length, (void**)&pVal->value.pData, &pVal->value.nData, pColSchema->bytes - VARSTR_HEADER_SIZE, charsetCxt);
|
||||||
int64_t size = pColSchema->bytes - VARSTR_HEADER_SIZE;
|
TSDB_CHECK_CODE(ret, lino, end);
|
||||||
if (size <= 0) {
|
|
||||||
ret = TSDB_CODE_SML_INVALID_DATA;
|
|
||||||
goto end;
|
|
||||||
}
|
|
||||||
char* pUcs4 = taosMemoryCalloc(1, size);
|
|
||||||
if (NULL == pUcs4) {
|
|
||||||
ret = terrno;
|
|
||||||
goto end;
|
|
||||||
}
|
|
||||||
if (!taosMbsToUcs4(kv->value, kv->length, (TdUcs4*)pUcs4, size, &len, charsetCxt)) {
|
|
||||||
if (terrno == TAOS_SYSTEM_ERROR(E2BIG)) {
|
|
||||||
taosMemoryFree(pUcs4);
|
|
||||||
ret = TSDB_CODE_PAR_VALUE_TOO_LONG;
|
|
||||||
goto end;
|
|
||||||
}
|
|
||||||
taosMemoryFree(pUcs4);
|
|
||||||
ret = TSDB_CODE_TSC_INVALID_VALUE;
|
|
||||||
goto end;
|
|
||||||
}
|
|
||||||
pVal->value.pData = pUcs4;
|
|
||||||
pVal->value.nData = len;
|
|
||||||
} else if (kv->type == TSDB_DATA_TYPE_BINARY) {
|
} else if (kv->type == TSDB_DATA_TYPE_BINARY) {
|
||||||
pVal->value.nData = kv->length;
|
pVal->value.nData = kv->length;
|
||||||
pVal->value.pData = (uint8_t*)kv->value;
|
pVal->value.pData = (uint8_t*)kv->value;
|
||||||
} else if (kv->type == TSDB_DATA_TYPE_GEOMETRY || kv->type == TSDB_DATA_TYPE_VARBINARY) {
|
} else if (kv->type == TSDB_DATA_TYPE_GEOMETRY || kv->type == TSDB_DATA_TYPE_VARBINARY) {
|
||||||
pVal->value.nData = kv->length;
|
pVal->value.nData = kv->length;
|
||||||
pVal->value.pData = taosMemoryMalloc(kv->length);
|
pVal->value.pData = taosMemoryMalloc(kv->length);
|
||||||
if (!pVal->value.pData) {
|
TSDB_CHECK_NULL(pVal->value.pData, ret, lino, end, terrno);
|
||||||
ret = terrno;
|
|
||||||
uError("SML smlBuildCol malloc failed %s:%d, err: %s", __func__, __LINE__, tstrerror(ret));
|
|
||||||
goto end;
|
|
||||||
}
|
|
||||||
(void)memcpy(pVal->value.pData, (uint8_t*)kv->value, kv->length);
|
(void)memcpy(pVal->value.pData, (uint8_t*)kv->value, kv->length);
|
||||||
} else {
|
} else {
|
||||||
(void)memcpy(&pVal->value.val, &(kv->value), kv->length);
|
(void)memcpy(&pVal->value.val, &(kv->value), kv->length);
|
||||||
|
@ -286,12 +229,17 @@ int32_t smlBuildCol(STableDataCxt* pTableCxt, SSchema* schema, void* data, int32
|
||||||
pVal->flag = CV_FLAG_VALUE;
|
pVal->flag = CV_FLAG_VALUE;
|
||||||
|
|
||||||
end:
|
end:
|
||||||
|
if (ret != 0){
|
||||||
|
uError("%s failed at %d since %s", __func__, lino, tstrerror(ret));
|
||||||
|
}
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t smlBindData(SQuery* query, bool dataFormat, SArray* tags, SArray* colsSchema, SArray* cols,
|
int32_t smlBindData(SQuery* query, bool dataFormat, SArray* tags, SArray* colsSchema, SArray* cols,
|
||||||
STableMeta* pTableMeta, char* tableName, const char* sTableName, int32_t sTableNameLen, int32_t ttl,
|
STableMeta* pTableMeta, char* tableName, const char* sTableName, int32_t sTableNameLen, int32_t ttl,
|
||||||
char* msgBuf, int32_t msgBufLen, void* charsetCxt) {
|
char* msgBuf, int32_t msgBufLen, void* charsetCxt) {
|
||||||
|
int32_t lino = 0;
|
||||||
|
int32_t ret = 0;
|
||||||
SMsgBuf pBuf = {.buf = msgBuf, .len = msgBufLen};
|
SMsgBuf pBuf = {.buf = msgBuf, .len = msgBufLen};
|
||||||
|
|
||||||
SSchema* pTagsSchema = getTableTagSchema(pTableMeta);
|
SSchema* pTagsSchema = getTableTagSchema(pTableMeta);
|
||||||
|
@ -299,50 +247,32 @@ int32_t smlBindData(SQuery* query, bool dataFormat, SArray* tags, SArray* colsSc
|
||||||
SVCreateTbReq* pCreateTblReq = NULL;
|
SVCreateTbReq* pCreateTblReq = NULL;
|
||||||
SArray* tagName = NULL;
|
SArray* tagName = NULL;
|
||||||
|
|
||||||
int ret = insInitBoundColsInfo(getNumOfTags(pTableMeta), &bindTags);
|
ret = insInitBoundColsInfo(getNumOfTags(pTableMeta), &bindTags);
|
||||||
if (ret != TSDB_CODE_SUCCESS) {
|
TSDB_CHECK_CODE(ret, lino, end);
|
||||||
ret = buildInvalidOperationMsg(&pBuf, "init bound cols error");
|
|
||||||
goto end;
|
|
||||||
}
|
|
||||||
|
|
||||||
ret = smlBoundColumnData(tags, &bindTags, pTagsSchema, true);
|
ret = smlBoundColumnData(tags, &bindTags, pTagsSchema, true);
|
||||||
if (ret != TSDB_CODE_SUCCESS) {
|
TSDB_CHECK_CODE(ret, lino, end);
|
||||||
ret = buildInvalidOperationMsg(&pBuf, "bound tags error");
|
|
||||||
goto end;
|
|
||||||
}
|
|
||||||
|
|
||||||
STag* pTag = NULL;
|
STag* pTag = NULL;
|
||||||
|
|
||||||
ret = smlBuildTagRow(tags, &bindTags, pTagsSchema, &pTag, &tagName, &pBuf, charsetCxt);
|
ret = smlBuildTagRow(tags, &bindTags, pTagsSchema, &pTag, &tagName, &pBuf, charsetCxt);
|
||||||
if (ret != TSDB_CODE_SUCCESS) {
|
TSDB_CHECK_CODE(ret, lino, end);
|
||||||
goto end;
|
|
||||||
}
|
|
||||||
|
|
||||||
pCreateTblReq = taosMemoryCalloc(1, sizeof(SVCreateTbReq));
|
pCreateTblReq = taosMemoryCalloc(1, sizeof(SVCreateTbReq));
|
||||||
if (NULL == pCreateTblReq) {
|
TSDB_CHECK_NULL(pCreateTblReq, ret, lino, end, terrno);
|
||||||
ret = terrno;
|
|
||||||
goto end;
|
|
||||||
}
|
|
||||||
ret = insBuildCreateTbReq(pCreateTblReq, tableName, pTag, pTableMeta->suid, NULL, tagName,
|
ret = insBuildCreateTbReq(pCreateTblReq, tableName, pTag, pTableMeta->suid, NULL, tagName,
|
||||||
pTableMeta->tableInfo.numOfTags, ttl);
|
pTableMeta->tableInfo.numOfTags, ttl);
|
||||||
if (TSDB_CODE_SUCCESS != ret) {
|
TSDB_CHECK_CODE(ret, lino, end);
|
||||||
goto end;
|
|
||||||
}
|
|
||||||
|
|
||||||
pCreateTblReq->ctb.stbName = taosMemoryCalloc(1, sTableNameLen + 1);
|
pCreateTblReq->ctb.stbName = taosMemoryCalloc(1, sTableNameLen + 1);
|
||||||
if (pCreateTblReq->ctb.stbName == NULL){
|
TSDB_CHECK_NULL(pCreateTblReq->ctb.stbName, ret, lino, end, terrno);
|
||||||
ret = terrno;
|
|
||||||
goto end;
|
|
||||||
}
|
|
||||||
(void)memcpy(pCreateTblReq->ctb.stbName, sTableName, sTableNameLen);
|
(void)memcpy(pCreateTblReq->ctb.stbName, sTableName, sTableNameLen);
|
||||||
|
|
||||||
if (dataFormat) {
|
if (dataFormat) {
|
||||||
STableDataCxt** pTableCxt = (STableDataCxt**)taosHashGet(((SVnodeModifyOpStmt*)(query->pRoot))->pTableBlockHashObj,
|
STableDataCxt** pTableCxt = (STableDataCxt**)taosHashGet(((SVnodeModifyOpStmt*)(query->pRoot))->pTableBlockHashObj,
|
||||||
&pTableMeta->uid, sizeof(pTableMeta->uid));
|
&pTableMeta->uid, sizeof(pTableMeta->uid));
|
||||||
if (NULL == pTableCxt) {
|
TSDB_CHECK_NULL(pTableCxt, ret, lino, end, TSDB_CODE_TSC_INVALID_OPERATION);
|
||||||
ret = buildInvalidOperationMsg(&pBuf, "dataformat true. get tableDataCtx error");
|
|
||||||
goto end;
|
|
||||||
}
|
|
||||||
(*pTableCxt)->pData->flags |= SUBMIT_REQ_AUTO_CREATE_TABLE;
|
(*pTableCxt)->pData->flags |= SUBMIT_REQ_AUTO_CREATE_TABLE;
|
||||||
(*pTableCxt)->pData->pCreateTbReq = pCreateTblReq;
|
(*pTableCxt)->pData->pCreateTbReq = pCreateTblReq;
|
||||||
(*pTableCxt)->pMeta->uid = pTableMeta->uid;
|
(*pTableCxt)->pMeta->uid = pTableMeta->uid;
|
||||||
|
@ -354,86 +284,47 @@ int32_t smlBindData(SQuery* query, bool dataFormat, SArray* tags, SArray* colsSc
|
||||||
STableDataCxt* pTableCxt = NULL;
|
STableDataCxt* pTableCxt = NULL;
|
||||||
ret = insGetTableDataCxt(((SVnodeModifyOpStmt*)(query->pRoot))->pTableBlockHashObj, &pTableMeta->uid,
|
ret = insGetTableDataCxt(((SVnodeModifyOpStmt*)(query->pRoot))->pTableBlockHashObj, &pTableMeta->uid,
|
||||||
sizeof(pTableMeta->uid), pTableMeta, &pCreateTblReq, &pTableCxt, false, false);
|
sizeof(pTableMeta->uid), pTableMeta, &pCreateTblReq, &pTableCxt, false, false);
|
||||||
if (ret != TSDB_CODE_SUCCESS) {
|
TSDB_CHECK_CODE(ret, lino, end);
|
||||||
ret = buildInvalidOperationMsg(&pBuf, "insGetTableDataCxt error");
|
|
||||||
goto end;
|
|
||||||
}
|
|
||||||
|
|
||||||
SSchema* pSchema = getTableColumnSchema(pTableMeta);
|
SSchema* pSchema = getTableColumnSchema(pTableMeta);
|
||||||
ret = smlBoundColumnData(colsSchema, &pTableCxt->boundColsInfo, pSchema, false);
|
ret = smlBoundColumnData(colsSchema, &pTableCxt->boundColsInfo, pSchema, false);
|
||||||
if (ret != TSDB_CODE_SUCCESS) {
|
TSDB_CHECK_CODE(ret, lino, end);
|
||||||
ret = buildInvalidOperationMsg(&pBuf, "bound cols error");
|
|
||||||
goto end;
|
|
||||||
}
|
|
||||||
|
|
||||||
ret = initTableColSubmitData(pTableCxt);
|
ret = initTableColSubmitData(pTableCxt);
|
||||||
if (ret != TSDB_CODE_SUCCESS) {
|
TSDB_CHECK_CODE(ret, lino, end);
|
||||||
ret = buildInvalidOperationMsg(&pBuf, "initTableColSubmitData error");
|
|
||||||
goto end;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t rowNum = taosArrayGetSize(cols);
|
int32_t rowNum = taosArrayGetSize(cols);
|
||||||
if (rowNum <= 0) {
|
TSDB_CHECK_CONDITION(rowNum > 0, ret, lino, end, TSDB_CODE_TSC_INVALID_OPERATION);
|
||||||
ret = buildInvalidOperationMsg(&pBuf, "cols size <= 0");
|
|
||||||
goto end;
|
|
||||||
}
|
|
||||||
|
|
||||||
for (int32_t r = 0; r < rowNum; ++r) {
|
for (int32_t r = 0; r < rowNum; ++r) {
|
||||||
void* rowData = taosArrayGetP(cols, r);
|
void* rowData = taosArrayGetP(cols, r);
|
||||||
if (rowData == NULL) {
|
TSDB_CHECK_NULL(rowData, ret, lino, end, terrno);
|
||||||
ret = terrno;
|
|
||||||
goto end;
|
|
||||||
}
|
|
||||||
// 1. set the parsed value from sql string
|
// 1. set the parsed value from sql string
|
||||||
for (int c = 0; c < pTableCxt->boundColsInfo.numOfBound; ++c) {
|
for (int c = 0; c < pTableCxt->boundColsInfo.numOfBound; ++c) {
|
||||||
SSchema* pColSchema = &pSchema[pTableCxt->boundColsInfo.pColIndex[c]];
|
SSchema* pColSchema = &pSchema[pTableCxt->boundColsInfo.pColIndex[c]];
|
||||||
SColVal* pVal = taosArrayGet(pTableCxt->pValues, pTableCxt->boundColsInfo.pColIndex[c]);
|
SColVal* pVal = taosArrayGet(pTableCxt->pValues, pTableCxt->boundColsInfo.pColIndex[c]);
|
||||||
if (pVal == NULL) {
|
TSDB_CHECK_NULL(pVal, ret, lino, end, terrno);
|
||||||
ret = terrno;
|
|
||||||
goto end;
|
|
||||||
}
|
|
||||||
void** p = taosHashGet(rowData, pColSchema->name, strlen(pColSchema->name));
|
void** p = taosHashGet(rowData, pColSchema->name, strlen(pColSchema->name));
|
||||||
if (p == NULL) {
|
if (p == NULL) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
SSmlKv* kv = *(SSmlKv**)p;
|
SSmlKv* kv = *(SSmlKv**)p;
|
||||||
if (kv->type != pColSchema->type) {
|
TSDB_CHECK_CONDITION(kv->type == pColSchema->type, ret, lino, end, TSDB_CODE_TSC_INVALID_OPERATION);
|
||||||
ret = buildInvalidOperationMsg(&pBuf, "kv type not equal to col type");
|
|
||||||
goto end;
|
|
||||||
}
|
|
||||||
if (pColSchema->type == TSDB_DATA_TYPE_TIMESTAMP) {
|
if (pColSchema->type == TSDB_DATA_TYPE_TIMESTAMP) {
|
||||||
kv->i = convertTimePrecision(kv->i, TSDB_TIME_PRECISION_NANO, pTableMeta->tableInfo.precision);
|
kv->i = convertTimePrecision(kv->i, TSDB_TIME_PRECISION_NANO, pTableMeta->tableInfo.precision);
|
||||||
}
|
}
|
||||||
if (kv->type == TSDB_DATA_TYPE_NCHAR) {
|
if (kv->type == TSDB_DATA_TYPE_NCHAR) {
|
||||||
int32_t len = 0;
|
ret = smlMbsToUcs4(kv->value, kv->length, (void**)&pVal->value.pData, &pVal->value.nData, pColSchema->bytes - VARSTR_HEADER_SIZE, charsetCxt);
|
||||||
char* pUcs4 = taosMemoryCalloc(1, pColSchema->bytes - VARSTR_HEADER_SIZE);
|
TSDB_CHECK_CODE(ret, lino, end);
|
||||||
if (NULL == pUcs4) {
|
|
||||||
ret = terrno;
|
|
||||||
goto end;
|
|
||||||
}
|
|
||||||
if (!taosMbsToUcs4(kv->value, kv->length, (TdUcs4*)pUcs4, pColSchema->bytes - VARSTR_HEADER_SIZE, &len, charsetCxt)) {
|
|
||||||
if (terrno == TAOS_SYSTEM_ERROR(E2BIG)) {
|
|
||||||
uError("sml bind taosMbsToUcs4 error, kv length:%d, bytes:%d, kv->value:%s", (int)kv->length,
|
|
||||||
pColSchema->bytes, kv->value);
|
|
||||||
(void)buildInvalidOperationMsg(&pBuf, "value too long");
|
|
||||||
ret = TSDB_CODE_PAR_VALUE_TOO_LONG;
|
|
||||||
goto end;
|
|
||||||
}
|
|
||||||
ret = buildInvalidOperationMsg(&pBuf, strerror(terrno));
|
|
||||||
goto end;
|
|
||||||
}
|
|
||||||
pVal->value.pData = pUcs4;
|
|
||||||
pVal->value.nData = len;
|
|
||||||
} else if (kv->type == TSDB_DATA_TYPE_BINARY) {
|
} else if (kv->type == TSDB_DATA_TYPE_BINARY) {
|
||||||
pVal->value.nData = kv->length;
|
pVal->value.nData = kv->length;
|
||||||
pVal->value.pData = (uint8_t*)kv->value;
|
pVal->value.pData = (uint8_t*)kv->value;
|
||||||
} else if (kv->type == TSDB_DATA_TYPE_GEOMETRY || kv->type == TSDB_DATA_TYPE_VARBINARY) {
|
} else if (kv->type == TSDB_DATA_TYPE_GEOMETRY || kv->type == TSDB_DATA_TYPE_VARBINARY) {
|
||||||
pVal->value.nData = kv->length;
|
pVal->value.nData = kv->length;
|
||||||
pVal->value.pData = taosMemoryMalloc(kv->length);
|
pVal->value.pData = taosMemoryMalloc(kv->length);
|
||||||
if (NULL == pVal->value.pData) {
|
TSDB_CHECK_NULL(pVal->value.pData, ret, lino, end, terrno);
|
||||||
ret = terrno;
|
|
||||||
goto end;
|
|
||||||
}
|
|
||||||
(void)memcpy(pVal->value.pData, (uint8_t*)kv->value, kv->length);
|
(void)memcpy(pVal->value.pData, (uint8_t*)kv->value, kv->length);
|
||||||
} else {
|
} else {
|
||||||
(void)memcpy(&pVal->value.val, &(kv->value), kv->length);
|
(void)memcpy(&pVal->value.val, &(kv->value), kv->length);
|
||||||
|
@ -442,22 +333,20 @@ int32_t smlBindData(SQuery* query, bool dataFormat, SArray* tags, SArray* colsSc
|
||||||
}
|
}
|
||||||
|
|
||||||
SRow** pRow = taosArrayReserve(pTableCxt->pData->aRowP, 1);
|
SRow** pRow = taosArrayReserve(pTableCxt->pData->aRowP, 1);
|
||||||
if (NULL == pRow) {
|
TSDB_CHECK_NULL(pRow, ret, lino, end, terrno);
|
||||||
ret = terrno;
|
|
||||||
goto end;
|
|
||||||
}
|
|
||||||
ret = tRowBuild(pTableCxt->pValues, pTableCxt->pSchema, pRow);
|
ret = tRowBuild(pTableCxt->pValues, pTableCxt->pSchema, pRow);
|
||||||
if (TSDB_CODE_SUCCESS != ret) {
|
TSDB_CHECK_CODE(ret, lino, end);
|
||||||
ret = buildInvalidOperationMsg(&pBuf, "tRowBuild error");
|
SRowKey key = {0};
|
||||||
goto end;
|
|
||||||
}
|
|
||||||
SRowKey key;
|
|
||||||
tRowGetKey(*pRow, &key);
|
tRowGetKey(*pRow, &key);
|
||||||
insCheckTableDataOrder(pTableCxt, &key);
|
insCheckTableDataOrder(pTableCxt, &key);
|
||||||
clearColValArraySml(pTableCxt->pValues);
|
clearColValArraySml(pTableCxt->pValues);
|
||||||
}
|
}
|
||||||
|
|
||||||
end:
|
end:
|
||||||
|
if (ret != 0){
|
||||||
|
uError("%s failed at %d since %s", __func__, lino, tstrerror(ret));
|
||||||
|
buildInvalidOperationMsg(&pBuf, tstrerror(ret));
|
||||||
|
}
|
||||||
insDestroyBoundColInfo(&bindTags);
|
insDestroyBoundColInfo(&bindTags);
|
||||||
tdDestroySVCreateTbReq(pCreateTblReq);
|
tdDestroySVCreateTbReq(pCreateTblReq);
|
||||||
taosMemoryFree(pCreateTblReq);
|
taosMemoryFree(pCreateTblReq);
|
||||||
|
@ -466,29 +355,21 @@ end:
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t smlInitHandle(SQuery** query) {
|
int32_t smlInitHandle(SQuery** query) {
|
||||||
|
int32_t lino = 0;
|
||||||
|
int32_t code = 0;
|
||||||
*query = NULL;
|
*query = NULL;
|
||||||
SQuery* pQuery = NULL;
|
SQuery* pQuery = NULL;
|
||||||
SVnodeModifyOpStmt* stmt = NULL;
|
SVnodeModifyOpStmt* stmt = NULL;
|
||||||
|
|
||||||
int32_t code = nodesMakeNode(QUERY_NODE_QUERY, (SNode**)&pQuery);
|
code = nodesMakeNode(QUERY_NODE_QUERY, (SNode**)&pQuery);
|
||||||
if (code != 0) {
|
TSDB_CHECK_CODE(code, lino, end);
|
||||||
uError("SML create pQuery error");
|
|
||||||
goto END;
|
|
||||||
}
|
|
||||||
pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
|
pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
|
||||||
pQuery->haveResultSet = false;
|
pQuery->haveResultSet = false;
|
||||||
pQuery->msgType = TDMT_VND_SUBMIT;
|
pQuery->msgType = TDMT_VND_SUBMIT;
|
||||||
code = nodesMakeNode(QUERY_NODE_VNODE_MODIFY_STMT, (SNode**)&stmt);
|
code = nodesMakeNode(QUERY_NODE_VNODE_MODIFY_STMT, (SNode**)&stmt);
|
||||||
if (code != 0) {
|
TSDB_CHECK_CODE(code, lino, end);
|
||||||
uError("SML create SVnodeModifyOpStmt error");
|
|
||||||
goto END;
|
|
||||||
}
|
|
||||||
stmt->pTableBlockHashObj = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
|
stmt->pTableBlockHashObj = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
|
||||||
if (stmt->pTableBlockHashObj == NULL){
|
TSDB_CHECK_NULL(stmt->pTableBlockHashObj, code, lino, end, terrno);
|
||||||
uError("SML create pTableBlockHashObj error");
|
|
||||||
code = terrno;
|
|
||||||
goto END;
|
|
||||||
}
|
|
||||||
stmt->freeHashFunc = insDestroyTableDataCxtHashMap;
|
stmt->freeHashFunc = insDestroyTableDataCxtHashMap;
|
||||||
stmt->freeArrayFunc = insDestroyVgroupDataCxtList;
|
stmt->freeArrayFunc = insDestroyVgroupDataCxtList;
|
||||||
|
|
||||||
|
@ -496,24 +377,28 @@ int32_t smlInitHandle(SQuery** query) {
|
||||||
*query = pQuery;
|
*query = pQuery;
|
||||||
return code;
|
return code;
|
||||||
|
|
||||||
END:
|
end:
|
||||||
|
if (code != 0) {
|
||||||
|
uError("%s failed at %d since %s", __func__, lino, tstrerror(code));
|
||||||
|
}
|
||||||
nodesDestroyNode((SNode*)stmt);
|
nodesDestroyNode((SNode*)stmt);
|
||||||
qDestroyQuery(pQuery);
|
qDestroyQuery(pQuery);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t smlBuildOutput(SQuery* handle, SHashObj* pVgHash) {
|
int32_t smlBuildOutput(SQuery* handle, SHashObj* pVgHash) {
|
||||||
|
int32_t lino = 0;
|
||||||
|
int32_t code = 0;
|
||||||
|
|
||||||
SVnodeModifyOpStmt* pStmt = (SVnodeModifyOpStmt*)(handle)->pRoot;
|
SVnodeModifyOpStmt* pStmt = (SVnodeModifyOpStmt*)(handle)->pRoot;
|
||||||
// merge according to vgId
|
code = insMergeTableDataCxt(pStmt->pTableBlockHashObj, &pStmt->pVgDataBlocks, true);
|
||||||
int32_t code = insMergeTableDataCxt(pStmt->pTableBlockHashObj, &pStmt->pVgDataBlocks, true);
|
TSDB_CHECK_CODE(code, lino, end);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
|
||||||
uError("insMergeTableDataCxt failed");
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
code = insBuildVgDataBlocks(pVgHash, pStmt->pVgDataBlocks, &pStmt->pDataBlocks, false);
|
code = insBuildVgDataBlocks(pVgHash, pStmt->pVgDataBlocks, &pStmt->pDataBlocks, false);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
TSDB_CHECK_CODE(code, lino, end);
|
||||||
uError("insBuildVgDataBlocks failed");
|
|
||||||
return code;
|
end:
|
||||||
|
if (code != 0) {
|
||||||
|
uError("%s failed at %d since %s", __func__, lino, tstrerror(code));
|
||||||
}
|
}
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
|
@ -405,8 +405,6 @@ bool taosMbsToUcs4(const char *mbs, size_t mbsLength, TdUcs4 *ucs4, int32_t ucs4
|
||||||
size_t ucs4_input_len = mbsLength;
|
size_t ucs4_input_len = mbsLength;
|
||||||
size_t outLeft = ucs4_max_len;
|
size_t outLeft = ucs4_max_len;
|
||||||
if (iconv(conv, (char **)&mbs, &ucs4_input_len, (char **)&ucs4, &outLeft) == -1) {
|
if (iconv(conv, (char **)&mbs, &ucs4_input_len, (char **)&ucs4, &outLeft) == -1) {
|
||||||
char buf[512] = {0};
|
|
||||||
snprintf(buf, tListLen(buf), " taosMbsToUcs4 error:%s %d %d", strerror(terrno), errno, EILSEQ);
|
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
taosReleaseConv(idx, conv, M2C, charsetCxt);
|
taosReleaseConv(idx, conv, M2C, charsetCxt);
|
||||||
return false;
|
return false;
|
||||||
|
|
Loading…
Reference in New Issue