Merge pull request #25399 from taosdata/opti/TS-4658

opti:add local thread error msg array
This commit is contained in:
dapan1121 2024-04-19 18:23:39 +08:00 committed by GitHub
commit 312ca43b9f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 80 additions and 23 deletions

View File

@ -152,7 +152,8 @@ int32_t smlBindData(SQuery* handle, bool dataFormat, SArray* tags, SArray* colsS
STableMeta* pTableMeta, char* tableName, const char* sTableName, int32_t sTableNameLen, int32_t ttl,
char* msgBuf, int32_t msgBufLen);
int32_t smlBuildOutput(SQuery* handle, SHashObj* pVgHash);
int rawBlockBindData(SQuery *query, STableMeta* pTableMeta, void* data, SVCreateTbReq** pCreateTb, TAOS_FIELD *fields, int numFields, bool needChangeLength);
int rawBlockBindData(SQuery *query, STableMeta* pTableMeta, void* data, SVCreateTbReq** pCreateTb, TAOS_FIELD *fields,
int numFields, bool needChangeLength, char* errstr, int32_t errstrLen);
int32_t rewriteToVnodeModifyOpStmt(SQuery* pQuery, SArray* pBufArray);
SArray* serializeVgroupsCreateTableBatch(SHashObj* pVgroupHashmap);

View File

@ -33,8 +33,16 @@ extern "C" {
const char* tstrerror(int32_t err);
const char* terrstr();
#define ERR_MSG_LEN 256
char* taosGetErrMsgReturn();
char* taosGetErrMsg();
int32_t* taosGetErrno();
#define terrno (*taosGetErrno())
#define terrMsg (taosGetErrMsg())
#define SET_ERROR_MSG(MSG, ...) \
snprintf(terrMsg, ERR_MSG_LEN, MSG, ##__VA_ARGS__)
#define TSDB_CODE_SUCCESS 0
#define TSDB_CODE_FAILED -1 // unknown or needn't tell detail error

View File

@ -1486,7 +1486,7 @@ int taos_write_raw_block_with_fields_with_reqid(TAOS* taos, int rows, char* pDat
pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
taosHashPut(pVgHash, (const char*)&vgData.vgId, sizeof(vgData.vgId), (char*)&vgData, sizeof(vgData));
code = rawBlockBindData(pQuery, pTableMeta, pData, NULL, fields, numFields, false);
code = rawBlockBindData(pQuery, pTableMeta, pData, NULL, fields, numFields, false, NULL, 0);
if (code != TSDB_CODE_SUCCESS) {
goto end;
}
@ -1570,7 +1570,7 @@ int taos_write_raw_block_with_reqid(TAOS* taos, int rows, char* pData, const cha
pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
taosHashPut(pVgHash, (const char*)&vgData.vgId, sizeof(vgData.vgId), (char*)&vgData, sizeof(vgData));
code = rawBlockBindData(pQuery, pTableMeta, pData, NULL, NULL, 0, false);
code = rawBlockBindData(pQuery, pTableMeta, pData, NULL, NULL, 0, false, NULL, 0);
if (code != TSDB_CODE_SUCCESS) {
goto end;
}
@ -1608,6 +1608,7 @@ static void* getRawDataFromRes(void* pRetrieve) {
static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) {
if (taos == NULL || data == NULL) {
terrno = TSDB_CODE_INVALID_PARA;
SET_ERROR_MSG("taos:%p or data:%p is NULL", taos, data);
return terrno;
}
int32_t code = TSDB_CODE_SUCCESS;
@ -1620,6 +1621,7 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) {
terrno = TSDB_CODE_SUCCESS;
SRequestObj* pRequest = (SRequestObj*)createRequest(*(int64_t*)taos, TSDB_SQL_INSERT, 0);
if (!pRequest) {
SET_ERROR_MSG("pRequest is NULL");
return terrno;
}
@ -1636,6 +1638,7 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) {
tDecoderInit(&decoder, data, dataLen);
code = tDecodeMqDataRsp(&decoder, &rspObj.rsp);
if (code != 0) {
SET_ERROR_MSG("decode mq data rsp failed");
code = TSDB_CODE_INVALID_MSG;
goto end;
}
@ -1648,6 +1651,7 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) {
struct SCatalog* pCatalog = NULL;
code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
if (code != TSDB_CODE_SUCCESS) {
SET_ERROR_MSG("cata log get handle failed");
goto end;
}
@ -1660,6 +1664,7 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) {
pQuery = smlInitHandle();
if (pQuery == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
SET_ERROR_MSG("init sml handle failed");
goto end;
}
pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
@ -1671,6 +1676,7 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) {
const char* tbName = (const char*)taosArrayGetP(rspObj.rsp.common.blockTbName, rspObj.common.resIter);
if (!tbName) {
SET_ERROR_MSG("block tbname is null");
code = TSDB_CODE_TMQ_INVALID_MSG;
goto end;
}
@ -1686,12 +1692,14 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) {
// continue;
// }
if (code != TSDB_CODE_SUCCESS) {
SET_ERROR_MSG("cata log get table:%s meta failed", tbName);
goto end;
}
SVgroupInfo vg;
code = catalogGetTableHashVgroup(pCatalog, &conn, &pName, &vg);
if (code != TSDB_CODE_SUCCESS) {
SET_ERROR_MSG("cata log get table:%s vgroup failed", tbName);
goto end;
}
@ -1703,6 +1711,8 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) {
SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(rspObj.rsp.common.blockSchema, rspObj.common.resIter);
TAOS_FIELD* fields = taosMemoryCalloc(pSW->nCols, sizeof(TAOS_FIELD));
if (fields == NULL) {
SET_ERROR_MSG("calloc fields failed");
code = TSDB_CODE_OUT_OF_MEMORY;
goto end;
}
for (int i = 0; i < pSW->nCols; i++) {
@ -1711,16 +1721,18 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) {
tstrncpy(fields[i].name, pSW->pSchema[i].name, tListLen(pSW->pSchema[i].name));
}
void* rawData = getRawDataFromRes(pRetrieve);
code = rawBlockBindData(pQuery, pTableMeta, rawData, NULL, fields, pSW->nCols, true);
char err[ERR_MSG_LEN] = {0};
code = rawBlockBindData(pQuery, pTableMeta, rawData, NULL, fields, pSW->nCols, true, err, ERR_MSG_LEN);
taosMemoryFree(fields);
if (code != TSDB_CODE_SUCCESS) {
SET_ERROR_MSG("table:%s, err:%s", tbName, err);
goto end;
}
taosMemoryFreeClear(pTableMeta);
}
code = smlBuildOutput(pQuery, pVgHash);
if (code != TSDB_CODE_SUCCESS) {
SET_ERROR_MSG("sml build output failed");
goto end;
}
@ -1742,6 +1754,7 @@ end:
static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen) {
if (taos == NULL || data == NULL) {
terrno = TSDB_CODE_INVALID_PARA;
SET_ERROR_MSG("taos:%p or data:%p is NULL", taos, data);
return terrno;
}
int32_t code = TSDB_CODE_SUCCESS;
@ -1755,6 +1768,7 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
terrno = TSDB_CODE_SUCCESS;
SRequestObj* pRequest = (SRequestObj*)createRequest(*(int64_t*)taos, TSDB_SQL_INSERT, 0);
if (!pRequest) {
SET_ERROR_MSG("pRequest is NULL");
return terrno;
}
uDebug(LOG_ID_TAG " write raw metadata, data:%p, dataLen:%d", LOG_ID_VALUE, data, dataLen);
@ -1771,6 +1785,7 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
tDecoderInit(&decoder, data, dataLen);
code = tDecodeSTaosxRsp(&decoder, &rspObj.rsp);
if (code != 0) {
SET_ERROR_MSG("decode mq taosx data rsp failed");
code = TSDB_CODE_INVALID_MSG;
goto end;
}
@ -1783,6 +1798,7 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
struct SCatalog* pCatalog = NULL;
code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
if (code != TSDB_CODE_SUCCESS) {
SET_ERROR_MSG("cata log get handle failed");
goto end;
}
@ -1794,6 +1810,7 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
pQuery = smlInitHandle();
if (pQuery == NULL) {
SET_ERROR_MSG("init sml handle failed");
code = TSDB_CODE_OUT_OF_MEMORY;
goto end;
}
@ -1808,6 +1825,7 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
const char* tbName = (const char*)taosArrayGetP(rspObj.rsp.common.blockTbName, rspObj.common.resIter);
if (!tbName) {
SET_ERROR_MSG("block tbname is null");
code = TSDB_CODE_TMQ_INVALID_MSG;
goto end;
}
@ -1829,6 +1847,7 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
tDecoderClear(&decoderTmp);
tDestroySVCreateTbReq(&pCreateReq, TSDB_MSG_FLG_DECODE);
code = TSDB_CODE_TMQ_INVALID_MSG;
SET_ERROR_MSG("decode create table:%s req failed", tbName);
goto end;
}
@ -1836,6 +1855,7 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
code = TSDB_CODE_TSC_INVALID_VALUE;
tDecoderClear(&decoderTmp);
tDestroySVCreateTbReq(&pCreateReq, TSDB_MSG_FLG_DECODE);
SET_ERROR_MSG("create table req type is not child table: %s, type: %d", tbName, pCreateReq.type);
goto end;
}
if (strcmp(tbName, pCreateReq.name) == 0) {
@ -1852,6 +1872,7 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
SVgroupInfo vg;
code = catalogGetTableHashVgroup(pCatalog, &conn, &pName, &vg);
if (code != TSDB_CODE_SUCCESS) {
SET_ERROR_MSG("cata log get table:%s vgroup failed", tbName);
goto end;
}
@ -1865,6 +1886,7 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
// continue;
// }
if (code != TSDB_CODE_SUCCESS) {
SET_ERROR_MSG("cata log get table:%s meta failed", tbName);
goto end;
}
@ -1881,6 +1903,8 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(rspObj.rsp.common.blockSchema, rspObj.common.resIter);
TAOS_FIELD* fields = taosMemoryCalloc(pSW->nCols, sizeof(TAOS_FIELD));
if (fields == NULL) {
SET_ERROR_MSG("calloc fields failed");
code = TSDB_CODE_OUT_OF_MEMORY;
goto end;
}
for (int i = 0; i < pSW->nCols; i++) {
@ -1889,12 +1913,13 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
tstrncpy(fields[i].name, pSW->pSchema[i].name, tListLen(pSW->pSchema[i].name));
}
void* rawData = getRawDataFromRes(pRetrieve);
code = rawBlockBindData(pQuery, pTableMeta, rawData, &pCreateReqDst, fields, pSW->nCols, true);
char err[ERR_MSG_LEN] = {0};
code = rawBlockBindData(pQuery, pTableMeta, rawData, &pCreateReqDst, fields, pSW->nCols, true, err, ERR_MSG_LEN);
taosMemoryFree(fields);
if (code != TSDB_CODE_SUCCESS) {
SET_ERROR_MSG("table:%s, err:%s", tbName, err);
goto end;
}
taosMemoryFreeClear(pTableMeta);
}
code = smlBuildOutput(pQuery, pVgHash);
@ -2056,6 +2081,7 @@ void tmq_free_raw(tmq_raw_data raw) {
if (raw.raw_type == RES_TYPE__TMQ || raw.raw_type == RES_TYPE__TMQ_METADATA) {
taosMemoryFree(raw.raw);
}
memset(terrMsg, 0, ERR_MSG_LEN);
}
int32_t tmq_write_raw(TAOS* taos, tmq_raw_data raw) {

View File

@ -1056,17 +1056,17 @@ static void tmqMgmtInit(void) {
}
}
#define SET_ERROR_MSG(MSG) \
#define SET_ERROR_MSG_TMQ(MSG) \
if (errstr != NULL) snprintf(errstr, errstrLen, MSG);
tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
if (conf == NULL) {
SET_ERROR_MSG("configure is null")
SET_ERROR_MSG_TMQ("configure is null")
return NULL;
}
taosThreadOnce(&tmqInit, tmqMgmtInit);
if (tmqInitRes != 0) {
terrno = tmqInitRes;
SET_ERROR_MSG("tmq timer init error")
SET_ERROR_MSG_TMQ("tmq timer init error")
return NULL;
}
@ -1074,7 +1074,7 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
if (pTmq == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
tscError("failed to create consumer, groupId:%s, code:%s", conf->groupId, terrstr());
SET_ERROR_MSG("malloc tmq failed")
SET_ERROR_MSG_TMQ("malloc tmq failed")
return NULL;
}
@ -1090,7 +1090,7 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
conf->groupId[0] == 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
tscError("consumer:0x%" PRIx64 " setup failed since %s, groupId:%s", pTmq->consumerId, terrstr(), pTmq->groupId);
SET_ERROR_MSG("malloc tmq element failed or group is empty")
SET_ERROR_MSG_TMQ("malloc tmq element failed or group is empty")
goto _failed;
}
@ -1123,7 +1123,7 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
if (tsem_init(&pTmq->rspSem, 0, 0) != 0) {
tscError("consumer:0x %" PRIx64 " setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(),
pTmq->groupId);
SET_ERROR_MSG("init t_sem failed")
SET_ERROR_MSG_TMQ("init t_sem failed")
goto _failed;
}
@ -1132,13 +1132,13 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
if (pTmq->pTscObj == NULL) {
tscError("consumer:0x%" PRIx64 " setup failed since %s, groupId:%s", pTmq->consumerId, terrstr(), pTmq->groupId);
tsem_destroy(&pTmq->rspSem);
SET_ERROR_MSG("init tscObj failed")
SET_ERROR_MSG_TMQ("init tscObj failed")
goto _failed;
}
pTmq->refId = taosAddRef(tmqMgmt.rsetId, pTmq);
if (pTmq->refId < 0) {
SET_ERROR_MSG("add tscObj ref failed")
SET_ERROR_MSG_TMQ("add tscObj ref failed")
goto _failed;
}
@ -2191,7 +2191,12 @@ const char* tmq_err2str(int32_t err) {
} else if (err == -1) {
return "fail";
} else {
if(*(taosGetErrMsg()) == 0){
return tstrerror(err);
} else{
snprintf(taosGetErrMsgReturn(), ERR_MSG_LEN, "%s,detail:%s", tstrerror(err), taosGetErrMsg());
return (const char*)taosGetErrMsgReturn();
}
}
}

View File

@ -641,7 +641,7 @@ static bool findFileds(SSchema* pSchema, TAOS_FIELD* fields, int numFields) {
}
int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreateTbReq** pCreateTb, TAOS_FIELD* tFields,
int numFields, bool needChangeLength) {
int numFields, bool needChangeLength, char* errstr, int32_t errstrLen) {
void* tmp =
taosHashGet(((SVnodeModifyOpStmt*)(query->pRoot))->pTableBlockHashObj, &pTableMeta->uid, sizeof(pTableMeta->uid));
STableDataCxt* pTableCxt = NULL;
@ -662,8 +662,7 @@ int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreate
}
char* p = (char*)data;
// | version | total length | total rows | blankFill | total columns | flag seg| block group id | column schema | each
// column length |
// | version | total length | total rows | blankFill | total columns | flag seg| block group id | column schema | each column length |
int32_t version = *(int32_t*)data;
p += sizeof(int32_t);
p += sizeof(int32_t);
@ -689,12 +688,12 @@ int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreate
SBoundColInfo* boundInfo = &pTableCxt->boundColsInfo;
if (tFields != NULL && numFields != numOfCols) {
uError("numFields:%d != numOfCols:%d", numFields, numOfCols);
if (errstr != NULL) snprintf(errstr, errstrLen, "numFields:%d != raw numOfCols:%d", numFields, numOfCols);
ret = TSDB_CODE_INVALID_PARA;
goto end;
}
if (tFields != NULL && numFields > boundInfo->numOfBound) {
uError("numFields:%d > boundInfo->numOfBound:%d", numFields, boundInfo->numOfBound);
if (errstr != NULL) snprintf(errstr, errstrLen, "numFields:%d > boundInfo->numOfBound:%d", numFields, boundInfo->numOfBound);
ret = TSDB_CODE_INVALID_PARA;
goto end;
}
@ -703,7 +702,8 @@ int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreate
SSchema* pColSchema = &pSchema[j];
SColData* pCol = taosArrayGet(pTableCxt->pData->aCol, j);
if (*fields != pColSchema->type && *(int32_t*)(fields + sizeof(int8_t)) != pColSchema->bytes) {
uError("type or bytes not equal");
if (errstr != NULL) snprintf(errstr, errstrLen, "type or bytes not equal, id:%d, type:%d, raw type:%d. bytes:%d, raw bytes:%d",
pColSchema->colId, pColSchema->type, *fields, pColSchema->bytes, *(int32_t*)(fields + sizeof(int8_t)));
ret = TSDB_CODE_INVALID_PARA;
goto end;
}
@ -732,7 +732,8 @@ int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreate
SSchema* pColSchema = &pSchema[j];
if (strcmp(pColSchema->name, tFields[i].name) == 0) {
if (*fields != pColSchema->type && *(int32_t*)(fields + sizeof(int8_t)) != pColSchema->bytes) {
uError("type or bytes not equal");
if (errstr != NULL) snprintf(errstr, errstrLen, "type or bytes not equal, id:%d, type:%d, raw type:%d. bytes:%d, raw bytes:%d",
pColSchema->colId, pColSchema->type, *fields, pColSchema->bytes, *(int32_t*)(fields + sizeof(int8_t)));
ret = TSDB_CODE_INVALID_PARA;
goto end;
}

View File

@ -27,8 +27,12 @@ typedef struct {
} STaosError;
static threadlocal int32_t tsErrno;
static threadlocal char tsErrMsgDetail[ERR_MSG_LEN] = {0};
static threadlocal char tsErrMsgReturn[ERR_MSG_LEN] = {0};
int32_t* taosGetErrno() { return &tsErrno; }
char* taosGetErrMsg() { return tsErrMsgDetail; }
char* taosGetErrMsgReturn() { return tsErrMsgReturn; }
#ifdef TAOS_ERROR_C
#define TAOS_DEFINE_ERROR(name, msg) {.val = (name), .str = (msg)},

View File

@ -1000,6 +1000,17 @@ void testConsumeExcluded(int topic_type){
}
taos_free_result(pRes);
}
void testDetailError(){
tmq_raw_data raw = {0};
raw.raw_type = 2;
int32_t code = tmq_write_raw((TAOS *)1, raw);
ASSERT(code);
const char *err = tmq_err2str(code);
char* tmp = strstr(err, "Invalid parameters,detail:taos:0x1 or data");
ASSERT(tmp != NULL);
}
int main(int argc, char* argv[]) {
for (int32_t i = 1; i < argc; i++) {
if (strcmp(argv[i], "-c") == 0) {
@ -1036,5 +1047,6 @@ int main(int argc, char* argv[]) {
testConsumeExcluded(1);
testConsumeExcluded(2);
testDetailError();
taosCloseFile(&g_fp);
}