fix(gpt): fix error in gpt (#30275)

* fix(stream): support packaging enterprise edition.

* feat(gpt): support lstm and do some internal refactor, add sample autoencoder model.

* feat(gpt): support lstm and do some internal refactor, add sample autoencoder model.

* test(gpt): disable model case.

* test(gpt): disable model case.

* doc: fix title error in doc.

* doc: add mlp doc.

* fix(gpt): add gpt

* fix(gpt): update the test cases.

* fix(gpt): update the test cases.

* test(gpt): disable gpt test in github action.

* fix(gpt): check the grant, check the rsp code.

* fix(gpt): limit the forecast rows.

* fix(gpt): use default gpt service

* fix(gpt): update error code.
This commit is contained in:
Haojun Liao 2025-03-20 15:18:58 +08:00 committed by GitHub
parent 59f45345fb
commit cd10173454
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 103 additions and 36 deletions

View File

@ -29,9 +29,10 @@ extern "C" {
#define ANALY_FORECAST_DEFAULT_CONF 95 #define ANALY_FORECAST_DEFAULT_CONF 95
#define ANALY_FORECAST_DEFAULT_WNCHECK 1 #define ANALY_FORECAST_DEFAULT_WNCHECK 1
#define ANALY_FORECAST_MAX_ROWS 40000 #define ANALY_FORECAST_MAX_ROWS 40000
#define ANALY_FORECAST_RES_MAX_ROWS 1024
#define ANALY_ANOMALY_WINDOW_MAX_ROWS 40000 #define ANALY_ANOMALY_WINDOW_MAX_ROWS 40000
#define ANALY_DEFAULT_TIMEOUT 60 #define ANALY_DEFAULT_TIMEOUT 60
#define ANALY_MAX_TIMEOUT 600 #define ANALY_MAX_TIMEOUT 600
typedef struct { typedef struct {
EAnalAlgoType type; EAnalAlgoType type;

View File

@ -514,6 +514,7 @@ int32_t taosGetErrSize();
#define TSDB_CODE_ANA_ANODE_RETURN_ERROR TAOS_DEF_ERROR_CODE(0, 0x0445) #define TSDB_CODE_ANA_ANODE_RETURN_ERROR TAOS_DEF_ERROR_CODE(0, 0x0445)
#define TSDB_CODE_ANA_ANODE_TOO_MANY_ROWS TAOS_DEF_ERROR_CODE(0, 0x0446) #define TSDB_CODE_ANA_ANODE_TOO_MANY_ROWS TAOS_DEF_ERROR_CODE(0, 0x0446)
#define TSDB_CODE_ANA_WN_DATA TAOS_DEF_ERROR_CODE(0, 0x0447) #define TSDB_CODE_ANA_WN_DATA TAOS_DEF_ERROR_CODE(0, 0x0447)
#define TSDB_CODE_ANA_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x0448)
// mnode-sma // mnode-sma
#define TSDB_CODE_MND_SMA_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0480) #define TSDB_CODE_MND_SMA_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0480)

View File

@ -434,7 +434,7 @@ static int32_t taosAnalyJsonBufWriteStr(SAnalyticBuf *pBuf, const char *buf, int
static int32_t taosAnalyJsonBufWriteStart(SAnalyticBuf *pBuf) { return taosAnalyJsonBufWriteStr(pBuf, "{\n", 0); } static int32_t taosAnalyJsonBufWriteStart(SAnalyticBuf *pBuf) { return taosAnalyJsonBufWriteStr(pBuf, "{\n", 0); }
static int32_t tsosAnalJsonBufOpen(SAnalyticBuf *pBuf, int32_t numOfCols) { static int32_t tsosAnalyJsonBufOpen(SAnalyticBuf *pBuf, int32_t numOfCols) {
pBuf->filePtr = taosOpenFile(pBuf->fileName, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_WRITE_THROUGH); pBuf->filePtr = taosOpenFile(pBuf->fileName, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_WRITE_THROUGH);
if (pBuf->filePtr == NULL) { if (pBuf->filePtr == NULL) {
return terrno; return terrno;
@ -666,7 +666,7 @@ void taosAnalyBufDestroy(SAnalyticBuf *pBuf) {
int32_t tsosAnalyBufOpen(SAnalyticBuf *pBuf, int32_t numOfCols) { int32_t tsosAnalyBufOpen(SAnalyticBuf *pBuf, int32_t numOfCols) {
if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON || pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) { if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON || pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) {
return tsosAnalJsonBufOpen(pBuf, numOfCols); return tsosAnalyJsonBufOpen(pBuf, numOfCols);
} else { } else {
return TSDB_CODE_ANA_BUF_INVALID_TYPE; return TSDB_CODE_ANA_BUF_INVALID_TYPE;
} }

View File

@ -371,6 +371,11 @@ static int32_t mndProcessCreateAnodeReq(SRpcMsg *pReq) {
SAnodeObj *pObj = NULL; SAnodeObj *pObj = NULL;
SMCreateAnodeReq createReq = {0}; SMCreateAnodeReq createReq = {0};
if ((code = grantCheck(TSDB_GRANT_TD_GPT)) != TSDB_CODE_SUCCESS) {
mError("failed to create anode, code:%s", tstrerror(code));
goto _OVER;
}
TAOS_CHECK_GOTO(tDeserializeSMCreateAnodeReq(pReq->pCont, pReq->contLen, &createReq), NULL, _OVER); TAOS_CHECK_GOTO(tDeserializeSMCreateAnodeReq(pReq->pCont, pReq->contLen, &createReq), NULL, _OVER);
mInfo("anode:%s, start to create", createReq.url); mInfo("anode:%s, start to create", createReq.url);

View File

@ -327,7 +327,7 @@ static int32_t anomalyParseJson(SJson* pJson, SArray* pWindows, const char* pId)
qError("%s failed to exec forecast, msg:%s", pId, pMsg); qError("%s failed to exec forecast, msg:%s", pId, pMsg);
} }
return TSDB_CODE_ANA_WN_DATA; return TSDB_CODE_ANA_INTERNAL_ERROR;
} else if (rows == 0) { } else if (rows == 0) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -382,7 +382,7 @@ static int32_t anomalyAnalysisWindow(SOperatorInfo* pOperator) {
SAnalyticBuf analyBuf = {.bufType = ANALYTICS_BUF_TYPE_JSON}; SAnalyticBuf analyBuf = {.bufType = ANALYTICS_BUF_TYPE_JSON};
char dataBuf[64] = {0}; char dataBuf[64] = {0};
int32_t code = 0; int32_t code = 0;
int64_t ts = 0; int64_t ts = taosGetTimestampMs();
int32_t lino = 0; int32_t lino = 0;
const char* pId = GET_TASKID(pOperator->pTaskInfo); const char* pId = GET_TASKID(pOperator->pTaskInfo);

View File

@ -158,8 +158,8 @@ static int32_t forecastCloseBuf(SForecastSupp* pSupp, const char* id) {
qDebug("%s forecast rows not found from %s, use default:%" PRId64, id, pSupp->algoOpt, pSupp->optRows); qDebug("%s forecast rows not found from %s, use default:%" PRId64, id, pSupp->algoOpt, pSupp->optRows);
} }
if (pSupp->optRows > ANALY_FORECAST_MAX_ROWS) { if (pSupp->optRows > ANALY_FORECAST_RES_MAX_ROWS) {
qError("%s required too many forecast rows, max allowed:%d, required:%" PRId64, id, ANALY_FORECAST_MAX_ROWS, qError("%s required too many forecast rows, max allowed:%d, required:%" PRId64, id, ANALY_FORECAST_RES_MAX_ROWS,
pSupp->optRows); pSupp->optRows);
return TSDB_CODE_ANA_ANODE_TOO_MANY_ROWS; return TSDB_CODE_ANA_ANODE_TOO_MANY_ROWS;
} }
@ -235,7 +235,7 @@ static int32_t forecastAnalysis(SForecastSupp* pSupp, SSDataBlock* pBlock, const
} }
tjsonDelete(pJson); tjsonDelete(pJson);
return TSDB_CODE_ANA_WN_DATA; return TSDB_CODE_ANA_INTERNAL_ERROR;
} }
if (code < 0) { if (code < 0) {

View File

@ -377,6 +377,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_ANA_BUF_INVALID_TYPE, "Analysis invalid buffe
TAOS_DEFINE_ERROR(TSDB_CODE_ANA_ANODE_RETURN_ERROR, "Analysis failed since anode return error") TAOS_DEFINE_ERROR(TSDB_CODE_ANA_ANODE_RETURN_ERROR, "Analysis failed since anode return error")
TAOS_DEFINE_ERROR(TSDB_CODE_ANA_ANODE_TOO_MANY_ROWS, "Analysis failed since too many input rows for anode") TAOS_DEFINE_ERROR(TSDB_CODE_ANA_ANODE_TOO_MANY_ROWS, "Analysis failed since too many input rows for anode")
TAOS_DEFINE_ERROR(TSDB_CODE_ANA_WN_DATA, "white-noise data not processed") TAOS_DEFINE_ERROR(TSDB_CODE_ANA_WN_DATA, "white-noise data not processed")
TAOS_DEFINE_ERROR(TSDB_CODE_ANA_INTERNAL_ERROR, "tdgpt internal error, not processed")
// mnode-sma // mnode-sma
TAOS_DEFINE_ERROR(TSDB_CODE_MND_SMA_ALREADY_EXIST, "SMA already exists") TAOS_DEFINE_ERROR(TSDB_CODE_MND_SMA_ALREADY_EXIST, "SMA already exists")

View File

@ -83,6 +83,9 @@ class _ArimaService(AbstractForecastService):
if self.list is None or len(self.list) < self.period: if self.list is None or len(self.list) < self.period:
raise ValueError("number of input data is less than the periods") raise ValueError("number of input data is less than the periods")
if len(self.list) > 3000:
raise ValueError("number of input data is too large")
if self.fc_rows <= 0: if self.fc_rows <= 0:
raise ValueError("fc rows is not specified yet") raise ValueError("fc rows is not specified yet")

View File

@ -17,7 +17,7 @@ class _GPTService(AbstractForecastService):
super().__init__() super().__init__()
self.table_name = None self.table_name = None
self.service_host = 'http://192.168.2.90:5000/ds_predict' self.service_host = 'http://127.0.0.1:5000/ds_predict'
self.headers = {'Content-Type': 'application/json'} self.headers = {'Content-Type': 'application/json'}
self.std = None self.std = None
@ -39,13 +39,16 @@ class _GPTService(AbstractForecastService):
response = requests.post(self.service_host, data=json.dumps(data), headers=self.headers) response = requests.post(self.service_host, data=json.dumps(data), headers=self.headers)
except Exception as e: except Exception as e:
app_logger.log_inst.error(f"failed to connect the service: {self.service_host} ", str(e)) app_logger.log_inst.error(f"failed to connect the service: {self.service_host} ", str(e))
raise ValueError("error") raise e
# print(response) if response.status_code == 404:
app_logger.log_inst.error(f"failed to connect the service: {self.service_host} ")
raise ValueError("invalid host url")
elif response.status_code != 200:
app_logger.log_inst.error(f"failed to request the service: {self.service_host}, reason: {response.text}")
raise ValueError(f"failed to request the service, {response.text}")
pred_y = response.json()['output'] pred_y = response.json()['output']
# print(f"pred_y len:{len(pred_y)}")
# print(f"pred_y:{pred_y}")
res = { res = {
"res": [pred_y] "res": [pred_y]
@ -54,31 +57,17 @@ class _GPTService(AbstractForecastService):
insert_ts_list(res["res"], self.start_ts, self.time_step, self.fc_rows) insert_ts_list(res["res"], self.start_ts, self.time_step, self.fc_rows)
return res return res
# insert_ts_list(res, self.start_ts, self.time_step, self.fc_rows)
#
# if self.return_conf:
# res1 = [res.tolist(), res.tolist(), res.tolist()], None
# else:
# res1 = [res.tolist()], None
#
# # add the conf range if required
# return {
# "mse": None,
# "res": res1
# }
def set_params(self, params): def set_params(self, params):
super().set_params(params) super().set_params(params)
if "host" not in params: if "host" in params:
raise ValueError("gpt service host needs to be specified") self.service_host = params['host']
self.service_host = params['host'].trim() if self.service_host.startswith("https://"):
self.service_host = self.service_host.replace("https://", "http://")
if self.service_host.startswith("https://"): elif "http://" not in self.service_host:
self.service_host = self.service_host.replace("https://", "http://") self.service_host = "http://" + self.service_host
elif "http://" not in self.service_host:
self.service_host = "http://" + self.service_host
app_logger.log_inst.info("%s specify gpt host service: %s", self.__class__.__name__, app_logger.log_inst.info("%s specify gpt host service: %s", self.__class__.__name__,
self.service_host) self.service_host)

View File

@ -8,7 +8,7 @@ import pandas as pd
sys.path.append(os.path.dirname(os.path.abspath(__file__)) + "/../../") sys.path.append(os.path.dirname(os.path.abspath(__file__)) + "/../../")
from taosanalytics.algo.forecast import draw_fc_results from taosanalytics.algo.forecast import draw_fc_results
from taosanalytics.conf import setup_log_info from taosanalytics.conf import setup_log_info, app_logger
from taosanalytics.servicemgmt import loader from taosanalytics.servicemgmt import loader
@ -30,7 +30,8 @@ class ForecastTest(unittest.TestCase):
ts_list = data[['Passengers']].index.tolist() ts_list = data[['Passengers']].index.tolist()
dst_list = [int(item.timestamp()) for item in ts_list] dst_list = [int(item.timestamp()) for item in ts_list]
return data[['Passengers']].values.tolist(), dst_list return data['Passengers'].values.tolist(), dst_list
def test_holt_winters_forecast(self): def test_holt_winters_forecast(self):
""" test holt winters forecast with invalid and then valid parameters""" """ test holt winters forecast with invalid and then valid parameters"""
@ -111,5 +112,20 @@ class ForecastTest(unittest.TestCase):
draw_fc_results(data, len(r["res"]) > 1, r["res"], rows, "arima") draw_fc_results(data, len(r["res"]) > 1, r["res"], rows, "arima")
def test_gpt_fc(self):
"""for local test only, disabled it in github action"""
data, ts = self.get_input_list()
pass
# s = loader.get_service("td_gpt_fc")
# s.set_input_list(data, ts)
#
# s.set_params({"host":'192.168.2.90:5000/ds_predict', 'fc_rows': 10, 'start_ts': 171000000, 'time_step': 86400*30})
# r = s.execute()
#
# rows = len(r["res"][0])
# draw_fc_results(data, False, r["res"], rows, "gpt")
if __name__ == '__main__': if __name__ == '__main__':
unittest.main() unittest.main()

View File

@ -257,3 +257,54 @@ class RestfulTest(TestCase):
self.assertEqual(response.status_code, 200) self.assertEqual(response.status_code, 200)
self.assertEqual(response.json["rows"], -1) self.assertEqual(response.json["rows"], -1)
def test_gpt_restful_service(self):
response = self.client.post('/forecast', json={
"schema": [
["ts", "TIMESTAMP", 8],
["val", "INT", 4]
],
"data": [
[
1577808000000, 1577808001000, 1577808002000, 1577808003000, 1577808004000,
1577808005000, 1577808006000, 1577808007000, 1577808008000, 1577808009000,
1577808010000, 1577808011000, 1577808012000, 1577808013000, 1577808014000,
1577808015000, 1577808016000, 1577808017000, 1577808018000, 1577808019000,
1577808020000, 1577808021000, 1577808022000, 1577808023000, 1577808024000,
1577808025000, 1577808026000, 1577808027000, 1577808028000, 1577808029000,
1577808030000, 1577808031000, 1577808032000, 1577808033000, 1577808034000,
1577808035000, 1577808036000, 1577808037000, 1577808038000, 1577808039000,
1577808040000, 1577808041000, 1577808042000, 1577808043000, 1577808044000,
1577808045000, 1577808046000, 1577808047000, 1577808048000, 1577808049000,
1577808050000, 1577808051000, 1577808052000, 1577808053000, 1577808054000,
1577808055000, 1577808056000, 1577808057000, 1577808058000, 1577808059000,
1577808060000, 1577808061000, 1577808062000, 1577808063000, 1577808064000,
1577808065000, 1577808066000, 1577808067000, 1577808068000, 1577808069000,
1577808070000, 1577808071000, 1577808072000, 1577808073000, 1577808074000,
1577808075000, 1577808076000, 1577808077000, 1577808078000, 1577808079000,
1577808080000, 1577808081000, 1577808082000, 1577808083000, 1577808084000,
1577808085000, 1577808086000, 1577808087000, 1577808088000, 1577808089000,
1577808090000, 1577808091000, 1577808092000, 1577808093000, 1577808094000,
1577808095000
],
[
13, 14, 8, 10, 16, 26, 32, 27, 18, 32, 36, 24, 22, 23, 22, 18, 25, 21, 21,
14, 8, 11, 14, 23, 18, 17, 19, 20, 22, 19, 13, 26, 13, 14, 22, 24, 21, 22,
26, 21, 23, 24, 27, 41, 31, 27, 35, 26, 28, 36, 39, 21, 17, 22, 17, 19, 15,
34, 10, 15, 22, 18, 15, 20, 15, 22, 19, 16, 30, 27, 29, 23, 20, 16, 21, 21,
25, 16, 18, 15, 18, 14, 10, 15, 8, 15, 6, 11, 8, 7, 13, 10, 23, 16, 15, 25
]
],
"option": "algo=td_gpt_fc",
"algo": "td_gpt_fc",
"prec": "ms",
"wncheck": 0,
"return_conf": 0,
"forecast_rows": 10,
"conf": 95,
"start": 1577808096000,
"every": 1000,
"rows": 21,
"protocol": 1.0
})