Merge pull request #27354 from taosdata/enh/TD-31535/assert

Enh/td 31535/assert
This commit is contained in:
dapan1121 2024-08-23 13:23:45 +08:00 committed by GitHub
commit fb56d67633
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
16 changed files with 123 additions and 69 deletions

View File

@ -702,6 +702,7 @@ int32_t taosGetErrSize();
#define TSDB_CODE_TQ_GROUP_NOT_SET TAOS_DEF_ERROR_CODE(0, 0x0A0B) #define TSDB_CODE_TQ_GROUP_NOT_SET TAOS_DEF_ERROR_CODE(0, 0x0A0B)
#define TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND TAOS_DEF_ERROR_CODE(0, 0x0A0C) #define TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND TAOS_DEF_ERROR_CODE(0, 0x0A0C)
#define TSDB_CODE_TQ_NO_COMMITTED_OFFSET TAOS_DEF_ERROR_CODE(0, 0x0A0D) #define TSDB_CODE_TQ_NO_COMMITTED_OFFSET TAOS_DEF_ERROR_CODE(0, 0x0A0D)
#define TSDB_CODE_TQ_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x0A0E)
// wal // wal
// #define TSDB_CODE_WAL_APP_ERROR TAOS_DEF_ERROR_CODE(0, 0x1000) // 2.x // #define TSDB_CODE_WAL_APP_ERROR TAOS_DEF_ERROR_CODE(0, 0x1000) // 2.x

View File

@ -413,7 +413,11 @@ static char* readFile(TdFilePtr pFile, int64_t* offset, int64_t size) {
return NULL; return NULL;
} }
ASSERT(size > *offset); if((size <= *offset)){
tscError("invalid size:%" PRId64 ", offset:%" PRId64, size, *offset);
terrno = TSDB_CODE_TSC_INTERNAL_ERROR;
return NULL;
}
char* pCont = NULL; char* pCont = NULL;
int64_t totalSize = 0; int64_t totalSize = 0;
if (size - *offset >= SLOW_LOG_SEND_SIZE_MAX) { if (size - *offset >= SLOW_LOG_SEND_SIZE_MAX) {
@ -506,6 +510,7 @@ static int32_t monitorReadSend(int64_t clusterId, TdFilePtr pFile, int64_t* offs
} }
SEpSet ep = getEpSet_s(&pInst->mgmtEp); SEpSet ep = getEpSet_s(&pInst->mgmtEp);
char* data = readFile(pFile, offset, size); char* data = readFile(pFile, offset, size);
if(data == NULL) return terrno;
return sendSlowLog(clusterId, data, (type == SLOW_LOG_READ_BEGINNIG ? pFile : NULL), *offset, type, fileName, return sendSlowLog(clusterId, data, (type == SLOW_LOG_READ_BEGINNIG ? pFile : NULL), *offset, type, fileName,
pInst->pTransporter, &ep); pInst->pTransporter, &ep);
} }

View File

@ -1657,7 +1657,6 @@ static void* getRawDataFromRes(void* pRetrieve) {
} else if (*(int64_t*)pRetrieve == 1) { } else if (*(int64_t*)pRetrieve == 1) {
rawData = ((SRetrieveTableRspForTmq*)pRetrieve)->data; rawData = ((SRetrieveTableRspForTmq*)pRetrieve)->data;
} }
ASSERT(rawData != NULL);
return rawData; return rawData;
} }

View File

@ -360,7 +360,10 @@ int32_t smlProcessSuperTable(SSmlHandle *info, SSmlLineInfo *elements) {
} else { } else {
sMeta = *tmp; sMeta = *tmp;
} }
ASSERT(sMeta != NULL); if (sMeta == NULL) {
uError("smlProcessSuperTable failed to get super table meta");
return TSDB_CODE_SML_INTERNAL_ERROR;
}
info->currSTableMeta = sMeta->tableMeta; info->currSTableMeta = sMeta->tableMeta;
info->maxTagKVs = sMeta->tags; info->maxTagKVs = sMeta->tags;
info->maxColKVs = sMeta->cols; info->maxColKVs = sMeta->cols;
@ -424,7 +427,10 @@ int32_t smlProcessChildTable(SSmlHandle *info, SSmlLineInfo *elements) {
} else { } else {
tinfo = *oneTable; tinfo = *oneTable;
} }
ASSERT(tinfo != NULL); if (tinfo == NULL) {
uError("smlProcessChildTable failed to get child table info");
return TSDB_CODE_SML_INTERNAL_ERROR;
}
if (info->dataFormat) info->currTableDataCtx = tinfo->tableDataCtx; if (info->dataFormat) info->currTableDataCtx = tinfo->tableDataCtx;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -556,7 +562,11 @@ int32_t smlSetCTableName(SSmlTableInfo *oneTable, char *tbnameKey) {
if (dst == NULL) { if (dst == NULL) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
ASSERT(oneTable->sTableNameLen < TSDB_TABLE_NAME_LEN); if(oneTable->sTableNameLen >= TSDB_TABLE_NAME_LEN){
uError("SML:smlSetCTableName super table name is too long");
taosArrayDestroy(dst);
return TSDB_CODE_SML_INTERNAL_ERROR;
}
char superName[TSDB_TABLE_NAME_LEN] = {0}; char superName[TSDB_TABLE_NAME_LEN] = {0};
RandTableName rName = {dst, NULL, (uint8_t)oneTable->sTableNameLen, oneTable->childTableName}; RandTableName rName = {dst, NULL, (uint8_t)oneTable->sTableNameLen, oneTable->childTableName};
if (tsSmlDot2Underline) { if (tsSmlDot2Underline) {

View File

@ -437,7 +437,11 @@ static int32_t smlParseColLine(SSmlHandle *info, char **sql, char *sqlEnd, SSmlL
} }
(void)memcpy(tmp, kv.value, kv.length); (void)memcpy(tmp, kv.value, kv.length);
PROCESS_SLASH_IN_FIELD_VALUE(tmp, kv.length); PROCESS_SLASH_IN_FIELD_VALUE(tmp, kv.length);
ASSERT(kv.type != TSDB_DATA_TYPE_GEOMETRY); if(kv.type == TSDB_DATA_TYPE_GEOMETRY) {
uError("SML:0x%" PRIx64 " smlParseColLine error, invalid GEOMETRY type.", info->id);
taosMemoryFree((void*)kv.value);
return TSDB_CODE_TSC_INVALID_VALUE;
}
if(kv.type == TSDB_DATA_TYPE_VARBINARY){ if(kv.type == TSDB_DATA_TYPE_VARBINARY){
taosMemoryFree((void*)kv.value); taosMemoryFree((void*)kv.value);
} }

View File

@ -1576,7 +1576,12 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
goto END; goto END;
} }
ASSERT(msgEpoch == clientEpoch); if(msgEpoch != clientEpoch) {
tscError("consumer:0x%" PRIx64 " msg discard from vgId:%d since from earlier epoch, rsp epoch %d, current epoch %d, reqId:0x%" PRIx64,
tmq->consumerId, vgId, msgEpoch, clientEpoch, requestId);
code = TSDB_CODE_TMQ_CONSUMER_MISMATCH;
goto END;
}
// handle meta rsp // handle meta rsp
rspType = ((SMqRspHead*)pMsg->pData)->mqMsgType; rspType = ((SMqRspHead*)pMsg->pData)->mqMsgType;
pRspWrapper->tmqRspType = rspType; pRspWrapper->tmqRspType = rspType;
@ -1874,7 +1879,10 @@ void changeByteEndian(char* pData) {
// | version | total length | total rows | total columns | flag seg| block group id | column schema | each column // | version | total length | total rows | total columns | flag seg| block group id | column schema | each column
// length | version: // length | version:
int32_t blockVersion = *(int32_t*)p; int32_t blockVersion = *(int32_t*)p;
ASSERT(blockVersion == BLOCK_VERSION_1); if(blockVersion != BLOCK_VERSION_1) {
tscError("invalid block version:%d", blockVersion);
return;
}
*(int32_t*)p = BLOCK_VERSION_2; *(int32_t*)p = BLOCK_VERSION_2;
p += sizeof(int32_t); p += sizeof(int32_t);

View File

@ -44,7 +44,10 @@ void printSubResults(void* pRes, int32_t* totalRows) {
} }
TAOS_FIELD* fields = taos_fetch_fields(pRes); TAOS_FIELD* fields = taos_fetch_fields(pRes);
assert(fields != NULL); if(fields == NULL) {
std::cout << "fields is null" << std::endl;
break;
}
int32_t numOfFields = taos_field_count(pRes); int32_t numOfFields = taos_field_count(pRes);
int32_t precision = taos_result_precision(pRes); int32_t precision = taos_result_precision(pRes);
(void)taos_print_row(buf, row, fields, numOfFields); (void)taos_print_row(buf, row, fields, numOfFields);
@ -61,7 +64,6 @@ void showDB(TAOS* pConn) {
TAOS_ROW pRow = NULL; TAOS_ROW pRow = NULL;
TAOS_FIELD* pFields = taos_fetch_fields(pRes); TAOS_FIELD* pFields = taos_fetch_fields(pRes);
assert(pFields != NULL);
int32_t numOfFields = taos_num_fields(pRes); int32_t numOfFields = taos_num_fields(pRes);
char str[512] = {0}; char str[512] = {0};
@ -173,7 +175,6 @@ void createNewTable(TAOS* pConn, int32_t index, int32_t numOfRows, int64_t start
i + 14, pVarchar, startTs + 15, i + 15, pVarchar, startTs + 16, i + 16, pVarchar, startTs + 17, i + 17, pVarchar, startTs + 18, i + 18, i + 14, pVarchar, startTs + 15, i + 15, pVarchar, startTs + 16, i + 16, pVarchar, startTs + 17, i + 17, pVarchar, startTs + 18, i + 18,
pVarchar, startTs + 19, i + 19, pVarchar); pVarchar, startTs + 19, i + 19, pVarchar);
TAOS_RES* p = taos_query(pConn, sql); TAOS_RES* p = taos_query(pConn, sql);
assert(p != NULL);
if (taos_errno(p) != 0) { if (taos_errno(p) != 0) {
(void)printf("failed to insert data, reason:%s\n", taos_errstr(p)); (void)printf("failed to insert data, reason:%s\n", taos_errstr(p));
} }
@ -223,9 +224,7 @@ void tmq_commit_cb_print(tmq_t* pTmq, int32_t code, void* param) {
void* doConsumeData(void* param) { void* doConsumeData(void* param) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL);
tmq_conf_t* conf = tmq_conf_new(); tmq_conf_t* conf = tmq_conf_new();
assert(conf != NULL);
(void)tmq_conf_set(conf, "enable.auto.commit", "true"); (void)tmq_conf_set(conf, "enable.auto.commit", "true");
(void)tmq_conf_set(conf, "auto.commit.interval.ms", "1000"); (void)tmq_conf_set(conf, "auto.commit.interval.ms", "1000");
(void)tmq_conf_set(conf, "group.id", "cgrpName41"); (void)tmq_conf_set(conf, "group.id", "cgrpName41");
@ -237,12 +236,10 @@ void* doConsumeData(void* param) {
(void)tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL); (void)tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL);
tmq_t* tmq = tmq_consumer_new(conf, NULL, 0); tmq_t* tmq = tmq_consumer_new(conf, NULL, 0);
assert(tmq != NULL);
tmq_conf_destroy(conf); tmq_conf_destroy(conf);
// 创建订阅 topics 列表 // 创建订阅 topics 列表
tmq_list_t* topicList = tmq_list_new(); tmq_list_t* topicList = tmq_list_new();
assert(topicList != NULL);
(void)tmq_list_append(topicList, "topic_t2"); (void)tmq_list_append(topicList, "topic_t2");
// 启动订阅 // 启动订阅
@ -279,7 +276,6 @@ void* doConsumeData(void* param) {
} }
fields = taos_fetch_fields(pRes); fields = taos_fetch_fields(pRes);
assert(fields != NULL);
numOfFields = taos_field_count(pRes); numOfFields = taos_field_count(pRes);
precision = taos_result_precision(pRes); precision = taos_result_precision(pRes);
(void)taos_print_row(buf, row, fields, numOfFields); (void)taos_print_row(buf, row, fields, numOfFields);
@ -336,7 +332,7 @@ TEST(clientCase, connect_Test) {
TEST(clientCase, create_user_Test) { TEST(clientCase, create_user_Test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL); ASSERT_NE(pConn, nullptr);
TAOS_RES* pRes = taos_query(pConn, "create user abc pass 'abc'"); TAOS_RES* pRes = taos_query(pConn, "create user abc pass 'abc'");
if (taos_errno(pRes) != TSDB_CODE_SUCCESS) { if (taos_errno(pRes) != TSDB_CODE_SUCCESS) {
@ -349,7 +345,7 @@ TEST(clientCase, create_user_Test) {
TEST(clientCase, create_account_Test) { TEST(clientCase, create_account_Test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL); ASSERT_NE(pConn, nullptr);
TAOS_RES* pRes = taos_query(pConn, "create account aabc pass 'abc'"); TAOS_RES* pRes = taos_query(pConn, "create account aabc pass 'abc'");
if (taos_errno(pRes) != TSDB_CODE_SUCCESS) { if (taos_errno(pRes) != TSDB_CODE_SUCCESS) {
@ -362,7 +358,7 @@ TEST(clientCase, create_account_Test) {
TEST(clientCase, drop_account_Test) { TEST(clientCase, drop_account_Test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL); ASSERT_NE(pConn, nullptr);
TAOS_RES* pRes = taos_query(pConn, "drop account aabc"); TAOS_RES* pRes = taos_query(pConn, "drop account aabc");
if (taos_errno(pRes) != TSDB_CODE_SUCCESS) { if (taos_errno(pRes) != TSDB_CODE_SUCCESS) {
@ -375,13 +371,13 @@ TEST(clientCase, drop_account_Test) {
TEST(clientCase, show_user_Test) { TEST(clientCase, show_user_Test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL); ASSERT_NE(pConn, nullptr);
TAOS_RES* pRes = taos_query(pConn, "show users"); TAOS_RES* pRes = taos_query(pConn, "show users");
TAOS_ROW pRow = NULL; TAOS_ROW pRow = NULL;
TAOS_FIELD* pFields = taos_fetch_fields(pRes); TAOS_FIELD* pFields = taos_fetch_fields(pRes);
assert(pFields != NULL); ASSERT_NE(pFields, nullptr);
int32_t numOfFields = taos_num_fields(pRes); int32_t numOfFields = taos_num_fields(pRes);
char str[512] = {0}; char str[512] = {0};
@ -396,7 +392,7 @@ TEST(clientCase, show_user_Test) {
TEST(clientCase, drop_user_Test) { TEST(clientCase, drop_user_Test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL); ASSERT_NE(pConn, nullptr);
TAOS_RES* pRes = taos_query(pConn, "drop user abc"); TAOS_RES* pRes = taos_query(pConn, "drop user abc");
if (taos_errno(pRes) != TSDB_CODE_SUCCESS) { if (taos_errno(pRes) != TSDB_CODE_SUCCESS) {
@ -409,13 +405,13 @@ TEST(clientCase, drop_user_Test) {
TEST(clientCase, show_db_Test) { TEST(clientCase, show_db_Test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL); ASSERT_NE(pConn, nullptr);
TAOS_RES* pRes = taos_query(pConn, "show databases"); TAOS_RES* pRes = taos_query(pConn, "show databases");
TAOS_ROW pRow = NULL; TAOS_ROW pRow = NULL;
TAOS_FIELD* pFields = taos_fetch_fields(pRes); TAOS_FIELD* pFields = taos_fetch_fields(pRes);
assert(pFields != NULL); ASSERT_NE(pFields, nullptr);
int32_t numOfFields = taos_num_fields(pRes); int32_t numOfFields = taos_num_fields(pRes);
char str[512] = {0}; char str[512] = {0};
@ -429,7 +425,7 @@ TEST(clientCase, show_db_Test) {
TEST(clientCase, create_db_Test) { TEST(clientCase, create_db_Test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL); ASSERT_NE(pConn, nullptr);
TAOS_RES* pRes = taos_query(pConn, "create database abc1 vgroups 2"); TAOS_RES* pRes = taos_query(pConn, "create database abc1 vgroups 2");
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
@ -454,7 +450,7 @@ TEST(clientCase, create_db_Test) {
TEST(clientCase, create_dnode_Test) { TEST(clientCase, create_dnode_Test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL); ASSERT_NE(pConn, nullptr);
TAOS_RES* pRes = taos_query(pConn, "create dnode abc1 port 7000"); TAOS_RES* pRes = taos_query(pConn, "create dnode abc1 port 7000");
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
@ -473,7 +469,7 @@ TEST(clientCase, create_dnode_Test) {
TEST(clientCase, drop_dnode_Test) { TEST(clientCase, drop_dnode_Test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL); ASSERT_NE(pConn, nullptr);
TAOS_RES* pRes = taos_query(pConn, "drop dnode 3"); TAOS_RES* pRes = taos_query(pConn, "drop dnode 3");
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
@ -498,7 +494,7 @@ TEST(clientCase, drop_dnode_Test) {
TEST(clientCase, use_db_test) { TEST(clientCase, use_db_test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL); ASSERT_NE(pConn, nullptr);
TAOS_RES* pRes = taos_query(pConn, "use abc1"); TAOS_RES* pRes = taos_query(pConn, "use abc1");
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
@ -517,7 +513,7 @@ TEST(clientCase, use_db_test) {
TEST(clientCase, create_stable_Test) { TEST(clientCase, create_stable_Test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL); ASSERT_NE(pConn, nullptr);
TAOS_RES* pRes = taos_query(pConn, "create database if not exists abc1 vgroups 2"); TAOS_RES* pRes = taos_query(pConn, "create database if not exists abc1 vgroups 2");
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
@ -545,7 +541,7 @@ TEST(clientCase, create_stable_Test) {
TEST(clientCase, create_table_Test) { TEST(clientCase, create_table_Test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL); ASSERT_NE(pConn, nullptr);
TAOS_RES* pRes = taos_query(pConn, "use abc1"); TAOS_RES* pRes = taos_query(pConn, "use abc1");
taos_free_result(pRes); taos_free_result(pRes);
@ -564,7 +560,7 @@ TEST(clientCase, create_table_Test) {
TEST(clientCase, create_ctable_Test) { TEST(clientCase, create_ctable_Test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL); ASSERT_NE(pConn, nullptr);
TAOS_RES* pRes = taos_query(pConn, "use abc1"); TAOS_RES* pRes = taos_query(pConn, "use abc1");
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
@ -589,7 +585,7 @@ TEST(clientCase, create_ctable_Test) {
TEST(clientCase, show_stable_Test) { TEST(clientCase, show_stable_Test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != nullptr); ASSERT_NE(pConn, nullptr);
TAOS_RES* pRes = taos_query(pConn, "show abc1.stables"); TAOS_RES* pRes = taos_query(pConn, "show abc1.stables");
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
@ -614,7 +610,7 @@ TEST(clientCase, show_stable_Test) {
TEST(clientCase, show_vgroup_Test) { TEST(clientCase, show_vgroup_Test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL); ASSERT_NE(pConn, nullptr);
TAOS_RES* pRes = taos_query(pConn, "use abc1"); TAOS_RES* pRes = taos_query(pConn, "use abc1");
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
@ -716,7 +712,7 @@ TEST(clientCase, create_multiple_tables) {
TEST(clientCase, show_table_Test) { TEST(clientCase, show_table_Test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL); ASSERT_NE(pConn, nullptr);
TAOS_RES* pRes = taos_query(pConn, "show tables"); TAOS_RES* pRes = taos_query(pConn, "show tables");
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
@ -760,7 +756,7 @@ TEST(clientCase, generated_request_id_test) {
if (result != nullptr) { if (result != nullptr) {
// (void)printf("0x%llx, index:%d\n", v, i); // (void)printf("0x%llx, index:%d\n", v, i);
} }
assert(result == nullptr); ASSERT_EQ(result, nullptr);
(void)taosHashPut(phash, &v, sizeof(v), NULL, 0); (void)taosHashPut(phash, &v, sizeof(v), NULL, 0);
} }
@ -1090,7 +1086,7 @@ TEST(clientCase, sub_db_test) {
if (row == NULL) break; if (row == NULL) break;
fields = taos_fetch_fields(pRes); fields = taos_fetch_fields(pRes);
assert(fields != NULL); ASSERT_NE(fields, nullptr);
numOfFields = taos_field_count(pRes); numOfFields = taos_field_count(pRes);
precision = taos_result_precision(pRes); precision = taos_result_precision(pRes);
rows++; rows++;
@ -1226,7 +1222,7 @@ TEST(clientCase, td_25129) {
(void)tmq_conf_set(conf, "msg.with.table.name", "true"); (void)tmq_conf_set(conf, "msg.with.table.name", "true");
tmq_t* tmq = tmq_consumer_new(conf, NULL, 0); tmq_t* tmq = tmq_consumer_new(conf, NULL, 0);
assert(tmq != NULL); ASSERT_NE(tmq, nullptr);
tmq_conf_destroy(conf); tmq_conf_destroy(conf);
char topicName[128] = "tp"; char topicName[128] = "tp";
@ -1393,7 +1389,7 @@ TEST(clientCase, sub_tb_test) {
(void)tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL); (void)tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL);
tmq_t* tmq = tmq_consumer_new(conf, NULL, 0); tmq_t* tmq = tmq_consumer_new(conf, NULL, 0);
assert(tmq != NULL); ASSERT_NE(tmq, nullptr);
tmq_conf_destroy(conf); tmq_conf_destroy(conf);
// 创建订阅 topics 列表 // 创建订阅 topics 列表

View File

@ -40,7 +40,7 @@ TEST(testCase, driverInit_Test) {
TEST(testCase, create_topic_ctb_Test) { TEST(testCase, create_topic_ctb_Test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL); ASSERT_NE(pConn, nullptr);
TAOS_RES* pRes = taos_query(pConn, "use abc1"); TAOS_RES* pRes = taos_query(pConn, "use abc1");
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
@ -64,7 +64,7 @@ TEST(testCase, create_topic_ctb_Test) {
TEST(testCase, create_topic_stb_Test) { TEST(testCase, create_topic_stb_Test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL); ASSERT_NE(pConn, nullptr);
TAOS_RES* pRes = taos_query(pConn, "use abc1"); TAOS_RES* pRes = taos_query(pConn, "use abc1");
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
@ -90,7 +90,7 @@ TEST(testCase, create_topic_stb_Test) {
#if 0 #if 0
TEST(testCase, tmq_subscribe_ctb_Test) { TEST(testCase, tmq_subscribe_ctb_Test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL); ASSERT_NE(pConn, NULL);
TAOS_RES* pRes = taos_query(pConn, "use abc1"); TAOS_RES* pRes = taos_query(pConn, "use abc1");
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
@ -116,7 +116,7 @@ TEST(testCase, tmq_subscribe_ctb_Test) {
TEST(testCase, tmq_subscribe_stb_Test) { TEST(testCase, tmq_subscribe_stb_Test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL); ASSERT_NE(pConn, NULL);
TAOS_RES* pRes = taos_query(pConn, "use abc1"); TAOS_RES* pRes = taos_query(pConn, "use abc1");
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {

View File

@ -53,7 +53,6 @@ void* MndTestTopic::BuildCreateDbReq(const char* dbname, int32_t* pContLen) {
int32_t contLen = tSerializeSCreateDbReq(NULL, 0, &createReq); int32_t contLen = tSerializeSCreateDbReq(NULL, 0, &createReq);
void* pReq = rpcMallocCont(contLen); void* pReq = rpcMallocCont(contLen);
assert(pReq != NULL);
(void)tSerializeSCreateDbReq(pReq, contLen, &createReq); (void)tSerializeSCreateDbReq(pReq, contLen, &createReq);
*pContLen = contLen; *pContLen = contLen;
@ -69,7 +68,6 @@ void* MndTestTopic::BuildCreateTopicReq(const char* topicName, const char* sql,
int32_t contLen = tSerializeSCMCreateTopicReq(NULL, 0, &createReq); int32_t contLen = tSerializeSCMCreateTopicReq(NULL, 0, &createReq);
void* pReq = rpcMallocCont(contLen); void* pReq = rpcMallocCont(contLen);
assert(pReq != NULL);
(void)tSerializeSCMCreateTopicReq(pReq, contLen, &createReq); (void)tSerializeSCMCreateTopicReq(pReq, contLen, &createReq);
*pContLen = contLen; *pContLen = contLen;
@ -82,7 +80,6 @@ void* MndTestTopic::BuildDropTopicReq(const char* topicName, int32_t* pContLen)
int32_t contLen = tSerializeSMDropTopicReq(NULL, 0, &dropReq); int32_t contLen = tSerializeSMDropTopicReq(NULL, 0, &dropReq);
void* pReq = rpcMallocCont(contLen); void* pReq = rpcMallocCont(contLen);
assert(pReq != NULL);
(void)tSerializeSMDropTopicReq(pReq, contLen, &dropReq); (void)tSerializeSMDropTopicReq(pReq, contLen, &dropReq);
*pContLen = contLen; *pContLen = contLen;

View File

@ -335,7 +335,7 @@ int32_t tqProcessPollPush(STQ* pTq, SRpcMsg* pMsg) {
STqHandle* pHandle = *(STqHandle**)pIter; STqHandle* pHandle = *(STqHandle**)pIter;
tqDebug("vgId:%d start set submit for pHandle:%p, consumer:0x%" PRIx64, vgId, pHandle, pHandle->consumerId); tqDebug("vgId:%d start set submit for pHandle:%p, consumer:0x%" PRIx64, vgId, pHandle, pHandle->consumerId);
if (ASSERT(pHandle->msg != NULL)) { if (pHandle->msg == NULL) {
tqError("pHandle->msg should not be null"); tqError("pHandle->msg should not be null");
taosHashCancelIterate(pTq->pPushMgr, pIter); taosHashCancelIterate(pTq->pPushMgr, pIter);
break; break;
@ -777,7 +777,11 @@ int32_t tqBuildStreamTask(void* pTqObj, SStreamTask* pTask, int64_t nextProcessV
pTask->info.selfChildId, pTask->info.taskLevel, p, pNext, pTask->info.fillHistory, pTask->info.selfChildId, pTask->info.taskLevel, p, pNext, pTask->info.fillHistory,
(int32_t)pTask->hTaskInfo.id.taskId, pTask->info.delaySchedParam, nextProcessVer); (int32_t)pTask->hTaskInfo.id.taskId, pTask->info.delaySchedParam, nextProcessVer);
ASSERT(pChkInfo->checkpointVer <= pChkInfo->nextProcessVer); if(pChkInfo->checkpointVer > pChkInfo->nextProcessVer) {
tqError("vgId:%d build stream task, s-task:%s, checkpointVer:%" PRId64 " > nextProcessVer:%" PRId64,
vgId, pTask->id.idStr, pChkInfo->checkpointVer, pChkInfo->nextProcessVer);
return TSDB_CODE_STREAM_INTERNAL_ERROR;
}
} }
return 0; return 0;
@ -817,7 +821,9 @@ static void doStartFillhistoryStep2(SStreamTask* pTask, SStreamTask* pStreamTask
", do secondary scan-history from WAL after halt the related stream task:%s", ", do secondary scan-history from WAL after halt the related stream task:%s",
id, pTask->info.taskLevel, pStep2Range->minVer, pStep2Range->maxVer, pWindow->skey, pWindow->ekey, id, pTask->info.taskLevel, pStep2Range->minVer, pStep2Range->maxVer, pWindow->skey, pWindow->ekey,
pStreamTask->id.idStr); pStreamTask->id.idStr);
ASSERT(pTask->status.schedStatus == TASK_SCHED_STATUS__WAITING); if (pTask->status.schedStatus != TASK_SCHED_STATUS__WAITING) {
tqError("s-task:%s level:%d unexpected sched-status:%d", id, pTask->info.taskLevel, pTask->status.schedStatus);
}
int32_t code = streamSetParamForStreamScannerStep2(pTask, pStep2Range, pWindow); int32_t code = streamSetParamForStreamScannerStep2(pTask, pStep2Range, pWindow);
if (code) { if (code) {
@ -950,7 +956,10 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
// the following procedure should be executed, no matter status is stop/pause or not // the following procedure should be executed, no matter status is stop/pause or not
tqDebug("s-task:%s scan-history(step 1) ended, elapsed time:%.2fs", id, pTask->execInfo.step1El); tqDebug("s-task:%s scan-history(step 1) ended, elapsed time:%.2fs", id, pTask->execInfo.step1El);
ASSERT(pTask->info.fillHistory == 1); if(pTask->info.fillHistory != 1) {
tqError("s-task:%s fill-history is disabled, unexpected", id);
return TSDB_CODE_STREAM_INTERNAL_ERROR;
}
// 1. get the related stream task // 1. get the related stream task
SStreamTask* pStreamTask = NULL; SStreamTask* pStreamTask = NULL;
@ -967,7 +976,10 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
return code; // todo: handle failure return code; // todo: handle failure
} }
ASSERT(pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE); if(pStreamTask->info.taskLevel != TASK_LEVEL__SOURCE) {
tqError("s-task:%s fill-history task related stream task level:%d, unexpected", id, pStreamTask->info.taskLevel);
return TSDB_CODE_STREAM_INTERNAL_ERROR;
}
code = streamTaskHandleEventAsync(pStreamTask->status.pSM, TASK_EVENT_HALT, handleStep2Async, pTq); code = streamTaskHandleEventAsync(pStreamTask->status.pSM, TASK_EVENT_HALT, handleStep2Async, pTq);
streamMetaReleaseTask(pMeta, pStreamTask); streamMetaReleaseTask(pMeta, pStreamTask);

View File

@ -386,7 +386,8 @@ int32_t extractMsgFromWal(SWalReader* pReader, void** pItem, int64_t maxVer, con
} }
} else { } else {
ASSERT(0); tqError("s-task:%s invalid msg type:%d, ver:%" PRId64, id, pCont->msgType, ver);
return TSDB_CODE_STREAM_INTERNAL_ERROR;
} }
return code; return code;
@ -675,7 +676,11 @@ int32_t tqRetrieveDataBlock(STqReader* pReader, SSDataBlock** pRes, const char*
pReader->cachedSchemaSuid = suid; pReader->cachedSchemaSuid = suid;
pReader->cachedSchemaVer = sversion; pReader->cachedSchemaVer = sversion;
ASSERT(pReader->cachedSchemaVer == pReader->pSchemaWrapper->version); if(pReader->cachedSchemaVer != pReader->pSchemaWrapper->version) {
tqError("vgId:%d, schema version mismatch, suid:%" PRId64 ", uid:%" PRId64 ", version:%d, cached version:%d",
vgId, suid, uid, sversion, pReader->pSchemaWrapper->version);
return TSDB_CODE_TQ_INTERNAL_ERROR;
}
if (blockDataGetNumOfCols(pBlock) == 0) { if (blockDataGetNumOfCols(pBlock) == 0) {
code = buildResSDataBlock(pReader->pResBlock, pReader->pSchemaWrapper, pReader->pColIdList); code = buildResSDataBlock(pReader->pResBlock, pReader->pSchemaWrapper, pReader->pColIdList);
TSDB_CHECK_CODE(code, line, END); TSDB_CHECK_CODE(code, line, END);

View File

@ -245,13 +245,22 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
int32_t totalMetaRows = 0; int32_t totalMetaRows = 0;
while (1) { while (1) {
int32_t savedEpoch = atomic_load_32(&pHandle->epoch); int32_t savedEpoch = atomic_load_32(&pHandle->epoch);
ASSERT(savedEpoch <= pRequest->epoch); if(savedEpoch > pRequest->epoch) {
tqError("tmq poll: consumer:0x%" PRIx64 " (epoch %d) iter log, savedEpoch error, vgId:%d offset %" PRId64,
pRequest->consumerId, pRequest->epoch, vgId, fetchVer);
code = TSDB_CODE_TQ_INTERNAL_ERROR;
goto END;
}
if (tqFetchLog(pTq, pHandle, &fetchVer, pRequest->reqId) < 0) { if (tqFetchLog(pTq, pHandle, &fetchVer, pRequest->reqId) < 0) {
if (totalMetaRows > 0) { if (totalMetaRows > 0) {
tqOffsetResetToLog(&btMetaRsp.rspOffset, fetchVer); tqOffsetResetToLog(&btMetaRsp.rspOffset, fetchVer);
code = tqSendBatchMetaPollRsp(pHandle, pMsg, pRequest, &btMetaRsp, vgId); code = tqSendBatchMetaPollRsp(pHandle, pMsg, pRequest, &btMetaRsp, vgId);
ASSERT(totalRows == 0); if(totalRows != 0) {
tqError("tmq poll: consumer:0x%" PRIx64 " (epoch %d) iter log, totalRows error, vgId:%d offset %" PRId64,
pRequest->consumerId, pRequest->epoch, vgId, fetchVer);
code = code == 0 ? TSDB_CODE_TQ_INTERNAL_ERROR : code;
}
goto END; goto END;
} }
tqOffsetResetToLog(&taosxRsp.common.rspOffset, fetchVer); tqOffsetResetToLog(&taosxRsp.common.rspOffset, fetchVer);
@ -302,12 +311,12 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
if (!btMetaRsp.batchMetaReq) { if (!btMetaRsp.batchMetaReq) {
btMetaRsp.batchMetaReq = taosArrayInit(4, POINTER_BYTES); btMetaRsp.batchMetaReq = taosArrayInit(4, POINTER_BYTES);
if (btMetaRsp.batchMetaReq == NULL) { if (btMetaRsp.batchMetaReq == NULL) {
code = TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY); code = TAOS_GET_TERRNO(terrno);
goto END; goto END;
} }
btMetaRsp.batchMetaLen = taosArrayInit(4, sizeof(int32_t)); btMetaRsp.batchMetaLen = taosArrayInit(4, sizeof(int32_t));
if (btMetaRsp.batchMetaLen == NULL) { if (btMetaRsp.batchMetaLen == NULL) {
code = TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY); code = TAOS_GET_TERRNO(terrno);
goto END; goto END;
} }
} }
@ -323,10 +332,10 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
tqError("tmq extract meta from log, tEncodeMqMetaRsp error"); tqError("tmq extract meta from log, tEncodeMqMetaRsp error");
continue; continue;
} }
int32_t tLen = sizeof(SMqRspHead) + len; int32_t tLen = sizeof(SMqRspHead) + len;
void* tBuf = taosMemoryCalloc(1, tLen); void* tBuf = taosMemoryCalloc(1, tLen);
if (tBuf == NULL) { if (tBuf == NULL){
code = TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY); code = TAOS_GET_TERRNO(terrno);
goto END; goto END;
} }
void* metaBuff = POINTER_SHIFT(tBuf, sizeof(SMqRspHead)); void* metaBuff = POINTER_SHIFT(tBuf, sizeof(SMqRspHead));
@ -339,12 +348,12 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
tqError("tmq extract meta from log, tEncodeMqMetaRsp error"); tqError("tmq extract meta from log, tEncodeMqMetaRsp error");
continue; continue;
} }
if (taosArrayPush(btMetaRsp.batchMetaReq, &tBuf) == NULL) { if (taosArrayPush(btMetaRsp.batchMetaReq, &tBuf) == NULL){
code = TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY); code = TAOS_GET_TERRNO(terrno);
goto END; goto END;
} }
if (taosArrayPush(btMetaRsp.batchMetaLen, &tLen) == NULL) { if (taosArrayPush(btMetaRsp.batchMetaLen, &tLen) == NULL){
code = TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY); code = TAOS_GET_TERRNO(terrno);
goto END; goto END;
} }
totalMetaRows++; totalMetaRows++;
@ -448,7 +457,7 @@ int32_t tqSendBatchMetaPollRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SM
int32_t tlen = sizeof(SMqRspHead) + len; int32_t tlen = sizeof(SMqRspHead) + len;
void* buf = rpcMallocCont(tlen); void* buf = rpcMallocCont(tlen);
if (buf == NULL) { if (buf == NULL) {
return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY); return TAOS_GET_TERRNO(terrno);
} }
int64_t sver = 0, ever = 0; int64_t sver = 0, ever = 0;
@ -628,7 +637,8 @@ int32_t tqExtractDelDataBlock(const void* pData, int32_t len, int64_t ver, void*
} else if (type == 1) { } else if (type == 1) {
*pRefBlock = pDelBlock; *pRefBlock = pDelBlock;
} else { } else {
ASSERTS(0, "unknown type:%d", type); tqError("unknown type:%d", type);
code = TSDB_CODE_TMQ_CONSUMER_ERROR;
} }
END: END:

View File

@ -740,11 +740,14 @@ static int32_t setCreateViewResultIntoDataBlock(SSDataBlock* pBlock, SShowCreate
SColumnInfoData* pCol2 = taosArrayGet(pBlock->pDataBlock, 1); SColumnInfoData* pCol2 = taosArrayGet(pBlock->pDataBlock, 1);
char* buf2 = taosMemoryMalloc(SHOW_CREATE_VIEW_RESULT_FIELD2_LEN); char* buf2 = taosMemoryMalloc(SHOW_CREATE_VIEW_RESULT_FIELD2_LEN);
if (NULL == buf2) { if (NULL == buf2) {
QRY_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); return terrno;
} }
SViewMeta* pMeta = pStmt->pViewMeta; SViewMeta* pMeta = pStmt->pViewMeta;
ASSERT(pMeta); if(NULL == pMeta) {
qError("exception: view meta is null");
return TSDB_CODE_APP_ERROR;
}
snprintf(varDataVal(buf2), SHOW_CREATE_VIEW_RESULT_FIELD2_LEN - VARSTR_HEADER_SIZE, "CREATE VIEW `%s`.`%s` AS %s", snprintf(varDataVal(buf2), SHOW_CREATE_VIEW_RESULT_FIELD2_LEN - VARSTR_HEADER_SIZE, "CREATE VIEW `%s`.`%s` AS %s",
pStmt->dbName, pStmt->viewName, pMeta->querySql); pStmt->dbName, pStmt->viewName, pMeta->querySql);
int32_t len = strlen(varDataVal(buf2)); int32_t len = strlen(varDataVal(buf2));

View File

@ -883,6 +883,10 @@ static bool findFileds(SSchema* pSchema, TAOS_FIELD* fields, int numFields) {
int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreateTbReq** pCreateTb, TAOS_FIELD* tFields, int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreateTbReq** pCreateTb, TAOS_FIELD* tFields,
int numFields, bool needChangeLength, char* errstr, int32_t errstrLen) { int numFields, bool needChangeLength, char* errstr, int32_t errstrLen) {
if(data == NULL) {
uError("rawBlockBindData, data is NULL");
return TSDB_CODE_APP_ERROR;
}
void* tmp = void* tmp =
taosHashGet(((SVnodeModifyOpStmt*)(query->pRoot))->pTableBlockHashObj, &pTableMeta->uid, sizeof(pTableMeta->uid)); taosHashGet(((SVnodeModifyOpStmt*)(query->pRoot))->pTableBlockHashObj, &pTableMeta->uid, sizeof(pTableMeta->uid));
STableDataCxt* pTableCxt = NULL; STableDataCxt* pTableCxt = NULL;

View File

@ -140,7 +140,6 @@ void* rpcMallocCont(int64_t contLen) {
char* start = taosMemoryCalloc(1, size); char* start = taosMemoryCalloc(1, size);
if (start == NULL) { if (start == NULL) {
tError("failed to malloc msg, size:%" PRId64, size); tError("failed to malloc msg, size:%" PRId64, size);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL; return NULL;
} else { } else {
tTrace("malloc mem:%p size:%" PRId64, start, size); tTrace("malloc mem:%p size:%" PRId64, start, size);

View File

@ -558,6 +558,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TQ_META_KEY_DUP_IN_TXN, "TQ met key dup in txn
TAOS_DEFINE_ERROR(TSDB_CODE_TQ_GROUP_NOT_SET, "TQ group not exist") TAOS_DEFINE_ERROR(TSDB_CODE_TQ_GROUP_NOT_SET, "TQ group not exist")
TAOS_DEFINE_ERROR(TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND, "TQ table schema not found") TAOS_DEFINE_ERROR(TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND, "TQ table schema not found")
TAOS_DEFINE_ERROR(TSDB_CODE_TQ_NO_COMMITTED_OFFSET, "TQ no committed offset") TAOS_DEFINE_ERROR(TSDB_CODE_TQ_NO_COMMITTED_OFFSET, "TQ no committed offset")
TAOS_DEFINE_ERROR(TSDB_CODE_TQ_INTERNAL_ERROR, "TQ internal error")
// wal // wal
TAOS_DEFINE_ERROR(TSDB_CODE_WAL_FILE_CORRUPTED, "WAL file is corrupted") TAOS_DEFINE_ERROR(TSDB_CODE_WAL_FILE_CORRUPTED, "WAL file is corrupted")