diff --git a/cmake/cmake.define b/cmake/cmake.define index cb59e0bc5b..0d5c21604a 100644 --- a/cmake/cmake.define +++ b/cmake/cmake.define @@ -1,6 +1,6 @@ cmake_minimum_required(VERSION 3.0) -set(CMAKE_VERBOSE_MAKEFILE ON) +set(CMAKE_VERBOSE_MAKEFILE OFF) set(TD_BUILD_TAOSA_INTERNAL FALSE) #set output directory diff --git a/cmake/rocksdb_CMakeLists.txt.in b/cmake/rocksdb_CMakeLists.txt.in index 7d9f49d3fa..ba4a404af6 100644 --- a/cmake/rocksdb_CMakeLists.txt.in +++ b/cmake/rocksdb_CMakeLists.txt.in @@ -1,8 +1,8 @@ # rocksdb ExternalProject_Add(rocksdb - GIT_REPOSITORY https://github.com/taosdata-contrib/rocksdb.git - GIT_TAG v6.23.3 + GIT_REPOSITORY https://github.com/facebook/rocksdb.git + GIT_TAG v8.1.1 SOURCE_DIR "${TD_CONTRIB_DIR}/rocksdb" CONFIGURE_COMMAND "" BUILD_COMMAND "" diff --git a/include/libs/stream/streamState.h b/include/libs/stream/streamState.h index 63e9e3799a..5b125b42d4 100644 --- a/include/libs/stream/streamState.h +++ b/include/libs/stream/streamState.h @@ -20,13 +20,13 @@ #include "tsimplehash.h" #include "tstreamFileState.h" +#ifndef _STREAM_STATE_H_ +#define _STREAM_STATE_H_ + #ifdef __cplusplus extern "C" { #endif -#ifndef _STREAM_STATE_H_ -#define _STREAM_STATE_H_ - // void* streamBackendInit(const char* path); // void streamBackendCleanup(void* arg); // SListNode* streamBackendAddCompare(void* backend, void* arg); diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 697dda0dfd..ba4ef5a669 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -13,16 +13,13 @@ * along with this program. If not, see . */ -#include "executor.h" #include "os.h" -#include "query.h" #include "streamState.h" #include "tdatablock.h" #include "tdbInt.h" #include "tmsg.h" #include "tmsgcb.h" #include "tqueue.h" -#include "trpc.h" #ifdef __cplusplus extern "C" { @@ -339,7 +336,7 @@ typedef struct SStreamMeta { TTB* pTaskDb; TTB* pCheckpointDb; SHashObj* pTasks; - SArray* pTaskList; // SArray + SArray* pTaskList; // SArray void* ahandle; TXN* txn; FTaskExpand* expandFunc; @@ -567,6 +564,8 @@ int32_t streamAggRecoverPrepare(SStreamTask* pTask); // int32_t streamAggChildrenRecoverFinish(SStreamTask* pTask); int32_t streamProcessRecoverFinishReq(SStreamTask* pTask, int32_t childId); +void streamMetaInit(); +void streamMetaCleanup(); SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId); void streamMetaClose(SStreamMeta* streamMeta); diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index ae06282b54..30a41afb65 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -449,6 +449,7 @@ static void *tscCrashReportThreadFp(void *param) { tscError("failed to send crash report"); if (pFile) { taosReleaseCrashLogFile(pFile, false); + pFile = NULL; continue; } } else { @@ -468,6 +469,7 @@ static void *tscCrashReportThreadFp(void *param) { if (pFile) { taosReleaseCrashLogFile(pFile, truncateFile); + pFile = NULL; truncateFile = false; } diff --git a/source/client/src/clientHb.c b/source/client/src/clientHb.c index ea5b5846e4..c96ea77a3f 100644 --- a/source/client/src/clientHb.c +++ b/source/client/src/clientHb.c @@ -24,6 +24,8 @@ typedef struct { struct { int64_t clusterId; int32_t passKeyCnt; + int32_t passVer; + int32_t reqCnt; }; }; } SHbParam; @@ -540,14 +542,20 @@ int32_t hbGetQueryBasicInfo(SClientHbKey *connKey, SClientHbReq *req) { return TSDB_CODE_SUCCESS; } -static int32_t hbGetUserBasicInfo(SClientHbKey *connKey, SClientHbReq *req) { +static int32_t hbGetUserBasicInfo(SClientHbKey *connKey, SHbParam *param, SClientHbReq *req) { STscObj *pTscObj = (STscObj *)acquireTscObj(connKey->tscRid); if (!pTscObj) { tscWarn("tscObj rid %" PRIx64 " not exist", connKey->tscRid); return TSDB_CODE_APP_ERROR; } - int32_t code = 0; + int32_t code = 0; + + if (param && (param->passVer != INT32_MIN) && (param->passVer <= pTscObj->passInfo.ver)) { + tscDebug("hb got user basic info, no need since passVer %d <= %d", param->passVer, pTscObj->passInfo.ver); + goto _return; + } + SUserPassVersion *user = taosMemoryMalloc(sizeof(SUserPassVersion)); if (!user) { code = TSDB_CODE_OUT_OF_MEMORY; @@ -574,6 +582,11 @@ static int32_t hbGetUserBasicInfo(SClientHbKey *connKey, SClientHbReq *req) { goto _return; } + // assign the passVer + if (param) { + param->passVer = pTscObj->passInfo.ver; + } + _return: releaseTscObj(connKey->tscRid); if (code) { @@ -719,13 +732,16 @@ int32_t hbGetAppInfo(int64_t clusterId, SClientHbReq *req) { } int32_t hbQueryHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req) { - SHbParam *hbParam = (SHbParam *)param; - struct SCatalog *pCatalog = NULL; + int32_t code = 0; + SHbParam *hbParam = (SHbParam *)param; + SCatalog *pCatalog = NULL; - int32_t code = catalogGetHandle(hbParam->clusterId, &pCatalog); - if (code != TSDB_CODE_SUCCESS) { - tscWarn("catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", hbParam->clusterId, tstrerror(code)); - return code; + if (hbParam->reqCnt == 0) { + code = catalogGetHandle(hbParam->clusterId, &pCatalog); + if (code != TSDB_CODE_SUCCESS) { + tscWarn("catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", hbParam->clusterId, tstrerror(code)); + return code; + } } hbGetAppInfo(hbParam->clusterId, req); @@ -733,23 +749,27 @@ int32_t hbQueryHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req hbGetQueryBasicInfo(connKey, req); if (hbParam->passKeyCnt > 0) { - hbGetUserBasicInfo(connKey, req); + hbGetUserBasicInfo(connKey, hbParam, req); } - code = hbGetExpiredUserInfo(connKey, pCatalog, req); - if (TSDB_CODE_SUCCESS != code) { - return code; + if (hbParam->reqCnt == 0) { + code = hbGetExpiredUserInfo(connKey, pCatalog, req); + if (TSDB_CODE_SUCCESS != code) { + return code; + } + + code = hbGetExpiredDBInfo(connKey, pCatalog, req); + if (TSDB_CODE_SUCCESS != code) { + return code; + } + + code = hbGetExpiredStbInfo(connKey, pCatalog, req); + if (TSDB_CODE_SUCCESS != code) { + return code; + } } - code = hbGetExpiredDBInfo(connKey, pCatalog, req); - if (TSDB_CODE_SUCCESS != code) { - return code; - } - - code = hbGetExpiredStbInfo(connKey, pCatalog, req); - if (TSDB_CODE_SUCCESS != code) { - return code; - } + ++hbParam->reqCnt; // success to get catalog info return TSDB_CODE_SUCCESS; } @@ -771,55 +791,47 @@ SClientHbBatchReq *hbGatherAllInfo(SAppHbMgr *pAppHbMgr) { } int32_t connKeyCnt = atomic_load_32(&pAppHbMgr->connKeyCnt); pBatchReq->reqs = taosArrayInit(connKeyCnt, sizeof(SClientHbReq)); - - int64_t rid = -1; - int32_t code = 0; - - void *pIter = taosHashIterate(pAppHbMgr->activeInfo, NULL); - - SClientHbReq *pOneReq = pIter; - SClientHbKey *connKey = pOneReq ? &pOneReq->connKey : NULL; - if (connKey != NULL) rid = connKey->tscRid; - - STscObj *pTscObj = (STscObj *)acquireTscObj(rid); - if (pTscObj == NULL) { + if (!pBatchReq->reqs) { tFreeClientHbBatchReq(pBatchReq); return NULL; } - while (pIter != NULL) { + void *pIter = NULL; + SHbParam param = {0}; + while ((pIter = taosHashIterate(pAppHbMgr->activeInfo, pIter))) { + SClientHbReq *pOneReq = pIter; + SClientHbKey *connKey = &pOneReq->connKey; + STscObj *pTscObj = (STscObj *)acquireTscObj(connKey->tscRid); + + if (!pTscObj) { + continue; + } + pOneReq = taosArrayPush(pBatchReq->reqs, pOneReq); - SHbParam param; - switch (pOneReq->connKey.connType) { + + switch (connKey->connType) { case CONN_TYPE__QUERY: { - param.clusterId = pOneReq->clusterId; + if (param.clusterId == 0) { + // init + param.clusterId = pOneReq->clusterId; + param.passVer = INT32_MIN; + } param.passKeyCnt = atomic_load_32(&pAppHbMgr->passKeyCnt); break; } default: break; } - if (clientHbMgr.reqHandle[pOneReq->connKey.connType]) { - code = (*clientHbMgr.reqHandle[pOneReq->connKey.connType])(&pOneReq->connKey, ¶m, pOneReq); + if (clientHbMgr.reqHandle[connKey->connType]) { + int32_t code = (*clientHbMgr.reqHandle[connKey->connType])(connKey, ¶m, pOneReq); if (code) { tscWarn("hbGatherAllInfo failed since %s, tscRid:%" PRIi64 ", connType:%" PRIi8, tstrerror(code), - pOneReq->connKey.tscRid, pOneReq->connKey.connType); + connKey->tscRid, connKey->connType); } } - break; -#if 0 - if (code) { - pIter = taosHashIterate(pAppHbMgr->activeInfo, pIter); - pOneReq = pIter; - continue; - } - - pIter = taosHashIterate(pAppHbMgr->activeInfo, pIter); - pOneReq = pIter; -#endif + releaseTscObj(connKey->tscRid); } - releaseTscObj(rid); return pBatchReq; } @@ -890,7 +902,6 @@ static void *hbThreadFunc(void *param) { hbGatherAppInfo(); } - SArray *mgr = taosArrayInit(sz, sizeof(void *)); for (int i = 0; i < sz; i++) { SAppHbMgr *pAppHbMgr = taosArrayGetP(clientHbMgr.appHbMgrs, i); if (pAppHbMgr == NULL) { @@ -899,7 +910,6 @@ static void *hbThreadFunc(void *param) { int32_t connCnt = atomic_load_32(&pAppHbMgr->connKeyCnt); if (connCnt == 0) { - taosArrayPush(mgr, &pAppHbMgr); continue; } SClientHbBatchReq *pReq = hbGatherAllInfo(pAppHbMgr); @@ -913,7 +923,6 @@ static void *hbThreadFunc(void *param) { terrno = TSDB_CODE_OUT_OF_MEMORY; tFreeClientHbBatchReq(pReq); // hbClearReqInfo(pAppHbMgr); - taosArrayPush(mgr, &pAppHbMgr); break; } @@ -925,7 +934,6 @@ static void *hbThreadFunc(void *param) { tFreeClientHbBatchReq(pReq); // hbClearReqInfo(pAppHbMgr); taosMemoryFree(buf); - taosArrayPush(mgr, &pAppHbMgr); break; } pInfo->fp = hbAsyncCallBack; @@ -946,12 +954,8 @@ static void *hbThreadFunc(void *param) { // hbClearReqInfo(pAppHbMgr); atomic_add_fetch_32(&pAppHbMgr->reportCnt, 1); - taosArrayPush(mgr, &pAppHbMgr); } - taosArrayDestroy(clientHbMgr.appHbMgrs); - clientHbMgr.appHbMgrs = mgr; - taosThreadMutexUnlock(&clientHbMgr.lock); taosMsleep(HEARTBEAT_INTERVAL); diff --git a/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c b/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c index 06b6221940..89c394fdd0 100644 --- a/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c +++ b/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c @@ -91,6 +91,7 @@ static void *dmCrashReportThreadFp(void *param) { dError("failed to send crash report"); if (pFile) { taosReleaseCrashLogFile(pFile, false); + pFile = NULL; continue; } } else { @@ -110,6 +111,7 @@ static void *dmCrashReportThreadFp(void *param) { if (pFile) { taosReleaseCrashLogFile(pFile, truncateFile); + pFile = NULL; truncateFile = false; } diff --git a/source/dnode/mgmt/node_mgmt/src/dmMgmt.c b/source/dnode/mgmt/node_mgmt/src/dmMgmt.c index d884120147..544512233e 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmMgmt.c +++ b/source/dnode/mgmt/node_mgmt/src/dmMgmt.c @@ -18,6 +18,7 @@ #include "dmNodes.h" #include "index.h" #include "qworker.h" +#include "tstream.h" static bool dmRequireNode(SDnode *pDnode, SMgmtWrapper *pWrapper) { SMgmtInputOpt input = dmBuildMgmtInputOpt(pWrapper); @@ -153,6 +154,7 @@ int32_t dmInitDnode(SDnode *pDnode) { } indexInit(tsNumOfCommitThreads); + streamMetaInit(); dmReportStartup("dnode-transport", "initialized"); dDebug("dnode is created, ptr:%p", pDnode); @@ -175,6 +177,7 @@ void dmCleanupDnode(SDnode *pDnode) { dmCleanupServer(pDnode); dmClearVars(pDnode); rpcCleanup(); + streamMetaCleanup(); indexCleanup(); taosConvDestroy(); dDebug("dnode is closed, ptr:%p", pDnode); diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index df7955771d..68b8dd7201 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -1306,7 +1306,7 @@ int32_t mndPauseAllStreamTasks(STrans *pTrans, SStreamObj *pStream) { int32_t sz = taosArrayGetSize(pTasks); for (int32_t j = 0; j < sz; j++) { SStreamTask *pTask = taosArrayGetP(pTasks, j); - if (mndPauseStreamTask(pTrans, pTask) < 0) { + if (pTask->taskLevel == TASK_LEVEL__SOURCE && mndPauseStreamTask(pTrans, pTask) < 0) { return -1; } } @@ -1430,7 +1430,7 @@ int32_t mndResumeAllStreamTasks(STrans *pTrans, SStreamObj *pStream, int8_t igUn int32_t sz = taosArrayGetSize(pTasks); for (int32_t j = 0; j < sz; j++) { SStreamTask *pTask = taosArrayGetP(pTasks, j); - if (mndResumeStreamTask(pTrans, pTask, igUntreated) < 0) { + if (pTask->taskLevel == TASK_LEVEL__SOURCE && mndResumeStreamTask(pTrans, pTask, igUntreated) < 0) { return -1; } } diff --git a/source/libs/executor/src/timesliceoperator.c b/source/libs/executor/src/timesliceoperator.c index cc2fffae03..2698b0bfd9 100644 --- a/source/libs/executor/src/timesliceoperator.c +++ b/source/libs/executor/src/timesliceoperator.c @@ -899,8 +899,10 @@ void destroyTimeSliceOperatorInfo(void* param) { } taosArrayDestroy(pInfo->pLinearInfo); - taosMemoryFree(pInfo->pPrevGroupKey->pData); - taosMemoryFree(pInfo->pPrevGroupKey); + if (pInfo->pPrevGroupKey) { + taosMemoryFree(pInfo->pPrevGroupKey->pData); + taosMemoryFree(pInfo->pPrevGroupKey); + } cleanupExprSupp(&pInfo->scalarSup); diff --git a/source/libs/stream/inc/streamBackendRocksdb.h b/source/libs/stream/inc/streamBackendRocksdb.h index 5d2970a4b7..0f39cf817b 100644 --- a/source/libs/stream/inc/streamBackendRocksdb.h +++ b/source/libs/stream/inc/streamBackendRocksdb.h @@ -16,8 +16,6 @@ #ifndef _STREAM_BACKEDN_ROCKSDB_H_ #define _STREAM_BACKEDN_ROCKSDB_H_ -#include "executor.h" - #include "rocksdb/c.h" // #include "streamInc.h" #include "streamState.h" @@ -112,14 +110,6 @@ int32_t streamStateGetParName_rocksdb(SStreamState* pState, int64_t groupId, voi void streamStateDestroy_rocksdb(SStreamState* pState, bool remove); -void* streamStateCreateBatch(); -int32_t streamStateGetBatchSize(void* pBatch); -void streamStateClearBatch(void* pBatch); -void streamStateDestroyBatch(void* pBatch); -int32_t streamStatePutBatch(SStreamState* pState, const char* cfName, rocksdb_writebatch_t* pBatch, void* key, - void* val, int32_t vlen); -int32_t streamStatePutBatch_rocksdb(SStreamState* pState, void* pBatch); - // default cf int32_t streamDefaultPut_rocksdb(SStreamState* pState, const void* key, void* pVal, int32_t pVLen); int32_t streamDefaultGet_rocksdb(SStreamState* pState, const void* key, void** pVal, int32_t* pVLen); @@ -138,7 +128,7 @@ int32_t streamStateGetBatchSize(void* pBatch); void streamStateClearBatch(void* pBatch); void streamStateDestroyBatch(void* pBatch); int32_t streamStatePutBatch(SStreamState* pState, const char* cfName, rocksdb_writebatch_t* pBatch, void* key, - void* val, int32_t vlen); + void* val, int32_t vlen, int64_t ttl); int32_t streamStatePutBatch_rocksdb(SStreamState* pState, void* pBatch); // int32_t streamDefaultIter_rocksdb(SStreamState* pState, const void* start, const void* end, SArray* result); #endif \ No newline at end of file diff --git a/source/libs/stream/inc/streamInc.h b/source/libs/stream/inc/streamInc.h index 71fbe5e086..c471bc2bd8 100644 --- a/source/libs/stream/inc/streamInc.h +++ b/source/libs/stream/inc/streamInc.h @@ -16,9 +16,12 @@ #ifndef _STREAM_INC_H_ #define _STREAM_INC_H_ -//#include "executor.h" +#include "executor.h" +#include "query.h" #include "tstream.h" +#include "trpc.h" + #ifdef __cplusplus extern "C" { #endif diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index db4ec17b19..16ba81c74a 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -13,8 +13,9 @@ * along with this program. If not, see . */ -// #include "streamStateRocksdb.h" #include "streamBackendRocksdb.h" +#include "executor.h" +#include "query.h" #include "tcommon.h" typedef struct SCompactFilteFactory { @@ -110,6 +111,9 @@ void* streamBackendInit(const char* path) { taosMemoryFreeClear(err); } } else { + /* + list all cf and get prefix + */ int64_t streamId; int32_t taskId, dummpy = 0; SHashObj* tbl = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); @@ -649,18 +653,7 @@ const char* compactFilteFactoryName(void* arg) { void destroyCompactFilte(void* arg) { (void)arg; } unsigned char compactFilte(void* arg, int level, const char* key, size_t klen, const char* val, size_t vlen, char** newval, size_t* newvlen, unsigned char* value_changed) { - // int64_t unixTime = taosGetTimestampMs(); - if (streamStateValueIsStale((char*)val)) { - return 1; - } - // SStreamValue value; - // memset(&value, 0, sizeof(value)); - // streamValueDecode(&value, (char*)val); - // taosMemoryFree(value.data); - // if (value.unixTimestamp != 0 && value.unixTimestamp < unixTime) { - // return 1; - // } - return 0; + return streamStateValueIsStale((char*)val) ? 1 : 0; } const char* compactFilteName(void* arg) { return "stream_filte"; } @@ -703,7 +696,6 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, SHashObj* ids) { memcpy(cfNames[0], "default", strlen("default")); continue; } - qError("cf name %s", idstr); GEN_COLUMN_FAMILY_NAME(cfNames[i], idstr, ginitDict[(i - 1) % (cfLen)].key); if (i % cfLen == 0) { @@ -711,9 +703,6 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, SHashObj* ids) { if (pIter != NULL) idstr = taosHashGetKey(pIter, &keyLen); } } - for (int i = 0; i < nSize * cfLen + 1; i++) { - qError("cf name %s", cfNames[i]); - } rocksdb_options_t** cfOpts = taosMemoryCalloc(nSize * cfLen + 1, sizeof(rocksdb_options_t*)); RocksdbCfParam* params = taosMemoryCalloc(nSize * cfLen + 1, sizeof(RocksdbCfParam*)); for (int i = 0; i < nSize * cfLen + 1; i++) { @@ -858,7 +847,6 @@ int streamStateOpenBackend(void* backend, SStreamState* pState) { if (err != NULL) { qError("failed to create cf:%s_%s, reason:%s", pState->pTdbState->idstr, ginitDict[i].key, err); taosMemoryFreeClear(err); - // return -1; } } pState->pTdbState->rocksdb = handle->db; @@ -1012,53 +1000,51 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa taosMemoryFree(ttlV); \ } while (0); -#define STREAM_STATE_GET_ROCKSDB(pState, funcname, key, pVal, vLen) \ - do { \ - code = 0; \ - char buf[128] = {0}; \ - char* err = NULL; \ - int i = streamGetInit(funcname); \ - if (i < 0) { \ - qWarn("streamState failed to get cf name: %s", funcname); \ - code = -1; \ - break; \ - } \ - char toString[128] = {0}; \ - if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \ - int32_t klen = ginitDict[i].enFunc((void*)key, buf); \ - rocksdb_column_family_handle_t* pHandle = pState->pTdbState->pHandle[ginitDict[i].idx]; \ - rocksdb_t* db = pState->pTdbState->rocksdb; \ - rocksdb_readoptions_t* opts = pState->pTdbState->readOpts; \ - size_t len = 0; \ - char* val = rocksdb_get_cf(db, opts, pHandle, (const char*)buf, klen, (size_t*)&len, &err); \ - if (val == NULL) { \ - qDebug("streamState str: %s failed to read from %s, err: not exist", toString, funcname); \ - if (err != NULL) taosMemoryFree(err); \ - code = -1; \ - } else { \ - char * p = NULL, *end = NULL; \ - int32_t len = ginitDict[i].deValueFunc(val, len, NULL, &p); \ - if (len < 0) { \ - qDebug("streamState str: %s failed to read from %s, err: %s, timeout", toString, funcname, err); \ - code = -1; \ - } else { \ - qDebug("streamState str: %s succ to read from %s, valLen:%d", toString, funcname, len); \ - } \ - if (pVal != NULL) { \ - *pVal = p; \ - } else { \ - taosMemoryFree(p); \ - } \ - taosMemoryFree(val); \ - if (vLen != NULL) *vLen = len; \ - } \ - if (err != NULL) { \ - taosMemoryFree(err); \ - qDebug("streamState str: %s failed to read from %s, err: %s", toString, funcname, err); \ - code = -1; \ - } else { \ - if (code == 0) qDebug("streamState str: %s succ to read from %s", toString, funcname); \ - } \ +#define STREAM_STATE_GET_ROCKSDB(pState, funcname, key, pVal, vLen) \ + do { \ + code = 0; \ + char buf[128] = {0}; \ + char* err = NULL; \ + int i = streamGetInit(funcname); \ + if (i < 0) { \ + qWarn("streamState failed to get cf name: %s", funcname); \ + code = -1; \ + break; \ + } \ + char toString[128] = {0}; \ + if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \ + int32_t klen = ginitDict[i].enFunc((void*)key, buf); \ + rocksdb_column_family_handle_t* pHandle = pState->pTdbState->pHandle[ginitDict[i].idx]; \ + rocksdb_t* db = pState->pTdbState->rocksdb; \ + rocksdb_readoptions_t* opts = pState->pTdbState->readOpts; \ + size_t len = 0; \ + char* val = rocksdb_get_cf(db, opts, pHandle, (const char*)buf, klen, (size_t*)&len, &err); \ + if (val == NULL) { \ + if (err == NULL) { \ + qDebug("streamState str: %s failed to read from %s_%s, err: not exist", toString, pState->pTdbState->idstr, \ + funcname); \ + } else { \ + qDebug("streamState str: %s failed to read from %s_%s, err: %s", toString, pState->pTdbState->idstr, funcname, \ + err); \ + taosMemoryFreeClear(err); \ + } \ + code = -1; \ + } else { \ + char* p = NULL; \ + int32_t len = ginitDict[i].deValueFunc(val, len, NULL, (char**)pVal); \ + if (len < 0) { \ + qDebug("streamState str: %s failed to read from %s_%s, err: already ttl ", toString, pState->pTdbState->idstr, \ + funcname); \ + code = -1; \ + } else { \ + qDebug("streamState str: %s succ to read from %s_%s, valLen:%d", toString, pState->pTdbState->idstr, funcname, \ + len); \ + } \ + taosMemoryFree(val); \ + if (vLen != NULL) *vLen = len; \ + } \ + if (code == 0) \ + qDebug("streamState str: %s succ to read from %s_%s", toString, pState->pTdbState->idstr, funcname); \ } while (0); #define STREAM_STATE_DEL_ROCKSDB(pState, funcname, key) \ @@ -1133,10 +1119,7 @@ int32_t streamStateClear_rocksdb(SStreamState* pState) { // rocksdb_compact_range_cf(pState->pTdbState->rocksdb, pState->pTdbState->pHandle[0], sKeyStr, sLen, eKeyStr, // eLen); if (err != NULL) { - qWarn( - "failed to delete range cf(state) err: %s, " - "start: %s, end:%s", - err, toStringStart, toStringEnd); + qWarn("failed to delete range cf(state) start: %s, end:%s, reason:%s", toStringStart, toStringEnd, err); taosMemoryFree(err); } @@ -1588,20 +1571,17 @@ int32_t streamStateFillGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, if (!rocksdb_iter_valid(pCur->iter) || iterValueIsStale(pCur->iter)) { return -1; } - size_t tlen; - char* keyStr = (char*)rocksdb_iter_key(pCur->iter, &tlen); + size_t klen, vlen; + char* keyStr = (char*)rocksdb_iter_key(pCur->iter, &klen); winKeyDecode(&winKey, keyStr); - size_t vlen = 0; const char* valStr = rocksdb_iter_value(pCur->iter, &vlen); - char* dst = NULL; - int32_t len = decodeValueFunc((void*)valStr, vlen, NULL, &dst); + // char* dst = NULL; + int32_t len = decodeValueFunc((void*)valStr, vlen, NULL, (char**)pVal); if (len < 0) { return -1; } - - if (pVal != NULL) *pVal = (char*)dst; - if (pVLen != NULL) *pVLen = vlen; + if (pVLen != NULL) *pVLen = len; *pKey = winKey; return 0; @@ -1999,7 +1979,7 @@ int32_t streamStateGetBatchSize(void* pBatch) { void streamStateClearBatch(void* pBatch) { rocksdb_writebatch_clear((rocksdb_writebatch_t*)pBatch); } void streamStateDestroyBatch(void* pBatch) { rocksdb_writebatch_destroy((rocksdb_writebatch_t*)pBatch); } int32_t streamStatePutBatch(SStreamState* pState, const char* cfName, rocksdb_writebatch_t* pBatch, void* key, - void* val, int32_t vlen) { + void* val, int32_t vlen, int64_t ttl) { int i = streamGetInit(cfName); if (i < 0) { @@ -2010,7 +1990,7 @@ int32_t streamStatePutBatch(SStreamState* pState, const char* cfName, rocksdb_wr int32_t klen = ginitDict[i].enFunc((void*)key, buf); char* ttlV = NULL; - int32_t ttlVLen = ginitDict[i].enValueFunc(val, vlen, 0, &ttlV); + int32_t ttlVLen = ginitDict[i].enValueFunc(val, vlen, ttl, &ttlV); rocksdb_column_family_handle_t* pCf = pState->pTdbState->pHandle[ginitDict[i].idx]; rocksdb_writebatch_put_cf((rocksdb_writebatch_t*)pBatch, pCf, buf, (size_t)klen, ttlV, (size_t)ttlVLen); taosMemoryFree(ttlV); diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index a437e15ffc..e05660da32 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -20,12 +20,12 @@ #define MIN_STREAM_EXEC_BATCH_NUM 16 bool streamTaskShouldStop(const SStreamStatus* pStatus) { - int32_t status = atomic_load_8((int8_t*) &pStatus->taskStatus); + int32_t status = atomic_load_8((int8_t*)&pStatus->taskStatus); return (status == TASK_STATUS__STOP) || (status == TASK_STATUS__DROPPING); } bool streamTaskShouldPause(const SStreamStatus* pStatus) { - int32_t status = atomic_load_8((int8_t*) &pStatus->taskStatus); + int32_t status = atomic_load_8((int8_t*)&pStatus->taskStatus); return (status == TASK_STATUS__PAUSE); } @@ -35,7 +35,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray* while (pTask->taskLevel == TASK_LEVEL__SOURCE) { int8_t status = atomic_load_8(&pTask->status.taskStatus); - if (status != TASK_STATUS__NORMAL) { + if (status != TASK_STATUS__NORMAL && status != TASK_STATUS__PAUSE) { qError("stream task wait for the end of fill history, s-task:%s, status:%d", pTask->id.idStr, atomic_load_8(&pTask->status.taskStatus)); taosMsleep(2); @@ -53,14 +53,14 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray* ASSERT(pTask->taskLevel == TASK_LEVEL__SOURCE); const SStreamDataSubmit2* pSubmit = (const SStreamDataSubmit2*)data; qSetMultiStreamInput(pExecutor, &pSubmit->submit, 1, STREAM_INPUT__DATA_SUBMIT); - qDebug("s-task:%s set submit blocks as source block completed, %p %p len:%d ver:%" PRId64, pTask->id.idStr, pSubmit, pSubmit->submit.msgStr, - pSubmit->submit.msgLen, pSubmit->submit.ver); + qDebug("s-task:%s set submit blocks as source block completed, %p %p len:%d ver:%" PRId64, pTask->id.idStr, pSubmit, + pSubmit->submit.msgStr, pSubmit->submit.msgLen, pSubmit->submit.ver); } else if (pItem->type == STREAM_INPUT__DATA_BLOCK || pItem->type == STREAM_INPUT__DATA_RETRIEVE) { const SStreamDataBlock* pBlock = (const SStreamDataBlock*)data; SArray* pBlockList = pBlock->blocks; int32_t numOfBlocks = taosArrayGetSize(pBlockList); - qDebug("s-task:%s set sdata blocks as input num:%d, ver:%"PRId64, pTask->id.idStr, numOfBlocks, pBlock->sourceVer); + qDebug("s-task:%s set sdata blocks as input num:%d, ver:%" PRId64, pTask->id.idStr, numOfBlocks, pBlock->sourceVer); qSetMultiStreamInput(pExecutor, pBlockList->pData, numOfBlocks, STREAM_INPUT__DATA_BLOCK); } else if (pItem->type == STREAM_INPUT__MERGED_SUBMIT) { const SStreamMergedSubmit2* pMerged = (const SStreamMergedSubmit2*)data; @@ -202,7 +202,8 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) { qRes->blocks = pRes; code = streamTaskOutput(pTask, qRes); if (code == TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY) { - taosFreeQitem(pRes); + taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); + taosFreeQitem(qRes); return code; } @@ -332,12 +333,12 @@ int32_t streamExecForAll(SStreamTask* pTask) { int64_t ckId = 0; int64_t dataVer = 0; qGetCheckpointVersion(pTask->exec.pExecutor, &dataVer, &ckId); - if (ckId > pTask->chkInfo.id) { // save it since the checkpoint is updated + if (ckId > pTask->chkInfo.id) { // save it since the checkpoint is updated qDebug("s-task:%s exec end, start to update check point, ver from %" PRId64 " to %" PRId64 ", checkPoint id:%" PRId64 " -> %" PRId64, pTask->id.idStr, pTask->chkInfo.version, dataVer, pTask->chkInfo.id, ckId); - pTask->chkInfo = (SCheckpointInfo) {.version = dataVer, .id = ckId, .currentVer = pTask->chkInfo.currentVer}; + pTask->chkInfo = (SCheckpointInfo){.version = dataVer, .id = ckId, .currentVer = pTask->chkInfo.currentVer}; taosWLockLatch(&pTask->pMeta->lock); @@ -407,7 +408,7 @@ int32_t streamTryExec(SStreamTask* pTask) { atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); qDebug("s-task:%s exec completed", pTask->id.idStr); - if (!taosQueueEmpty(pTask->inputQueue->queue) && (!streamTaskShouldStop(&pTask->status))) { + if (!taosQueueEmpty(pTask->inputQueue->queue) && (!streamTaskShouldStop(&pTask->status)) && (!streamTaskShouldPause(&pTask->status))) { streamSchedExec(pTask); } } diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index de56cf24ca..682ce08c7f 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -19,6 +19,13 @@ #include "tref.h" #include "ttimer.h" +static TdThreadOnce streamMetaModuleInit = PTHREAD_ONCE_INIT; +static int32_t streamBackendId = 0; +static void streamMetaEnvInit() { streamBackendId = taosOpenRef(20, streamBackendCleanup); } + +void streamMetaInit() { taosThreadOnce(&streamMetaModuleInit, streamMetaEnvInit); } +void streamMetaCleanup() { taosCloseRef(streamBackendId); } + SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId) { int32_t code = -1; SStreamMeta* pMeta = taosMemoryCalloc(1, sizeof(SStreamMeta)); @@ -32,18 +39,16 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF sprintf(streamPath, "%s/%s", path, "stream"); pMeta->path = taosStrdup(streamPath); if (tdbOpen(pMeta->path, 16 * 1024, 1, &pMeta->db, 0) < 0) { - taosMemoryFree(streamPath); goto _err; } + memset(streamPath, 0, len); sprintf(streamPath, "%s/%s", pMeta->path, "checkpoints"); code = taosMulModeMkDir(streamPath, 0755); if (code != 0) { terrno = TAOS_SYSTEM_ERROR(code); - taosMemoryFree(streamPath); goto _err; } - taosMemoryFree(streamPath); if (tdbTbOpen("task.db", sizeof(int32_t), -1, NULL, pMeta->db, &pMeta->pTaskDb, 0) < 0) { goto _err; @@ -74,26 +79,26 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF pMeta->vgId = vgId; pMeta->ahandle = ahandle; pMeta->expandFunc = expandFunc; + pMeta->streamBackendId = streamBackendId; - char* statePath = taosMemoryCalloc(1, len); - sprintf(statePath, "%s/%s", pMeta->path, "state"); - code = taosMulModeMkDir(statePath, 0755); + memset(streamPath, 0, len); + sprintf(streamPath, "%s/%s", pMeta->path, "state"); + code = taosMulModeMkDir(streamPath, 0755); if (code != 0) { terrno = TAOS_SYSTEM_ERROR(code); - taosMemoryFree(streamPath); goto _err; } - pMeta->streamBackend = streamBackendInit(statePath); - pMeta->streamBackendId = taosOpenRef(20, streamBackendCleanup); - pMeta->streamBackendRid = taosAddRef(pMeta->streamBackendId, pMeta->streamBackend); + pMeta->streamBackend = streamBackendInit(streamPath); + pMeta->streamBackendRid = taosAddRef(streamBackendId, pMeta->streamBackend); - taosMemoryFree(statePath); + taosMemoryFree(streamPath); taosInitRWLatch(&pMeta->lock); return pMeta; _err: + taosMemoryFree(streamPath); taosMemoryFree(pMeta->path); if (pMeta->pTasks) taosHashCleanup(pMeta->pTasks); if (pMeta->pTaskList) taosArrayDestroy(pMeta->pTaskList); @@ -129,9 +134,7 @@ void streamMetaClose(SStreamMeta* pMeta) { } taosHashCleanup(pMeta->pTasks); - taosRemoveRef(pMeta->streamBackendId, pMeta->streamBackendRid); - // streamBackendCleanup(pMeta->streamBackend); - taosCloseRef(pMeta->streamBackendId); + taosRemoveRef(streamBackendId, pMeta->streamBackendRid); pMeta->pTaskList = taosArrayDestroy(pMeta->pTaskList); taosMemoryFree(pMeta->path); taosMemoryFree(pMeta); @@ -265,13 +268,9 @@ void streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId) { SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t)); if (ppTask) { SStreamTask* pTask = *ppTask; - - // taosWLockLatch(&pMeta->lock); - taosHashRemove(pMeta->pTasks, &taskId, sizeof(int32_t)); tdbTbDelete(pMeta->pTaskDb, &taskId, sizeof(int32_t), pMeta->txn); - // atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__DROPPING); int32_t num = taosArrayGetSize(pMeta->pTaskList); diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index 1cca4d55cf..373cb27941 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -115,7 +115,7 @@ SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int pState->taskId = pTask->id.taskId; pState->streamId = pTask->id.streamId; #ifdef USE_ROCKSDB - qWarn("open stream state1"); + // qWarn("open stream state1"); taosAcquireRef(pTask->pMeta->streamBackendId, pTask->pMeta->streamBackendRid); int code = streamStateOpenBackend(pTask->pMeta->streamBackend, pState); if (code == -1) { @@ -220,6 +220,7 @@ void streamStateClose(SStreamState* pState, bool remove) { #ifdef USE_ROCKSDB // streamStateCloseBackend(pState); streamStateDestroy(pState, remove); + taosReleaseRef(pTask->pMeta->streamBackendId, pTask->pMeta->streamBackendRid); #else tdbCommit(pState->pTdbState->db, pState->pTdbState->txn); tdbPostCommit(pState->pTdbState->db, pState->pTdbState->txn); @@ -231,7 +232,6 @@ void streamStateClose(SStreamState* pState, bool remove) { tdbTbClose(pState->pTdbState->pParTagDb); tdbClose(pState->pTdbState->db); #endif - taosReleaseRef(pTask->pMeta->streamBackendId, pTask->pMeta->streamBackendRid); } int32_t streamStateBegin(SStreamState* pState) { @@ -399,7 +399,7 @@ int32_t streamStateSaveInfo(SStreamState* pState, void* pKey, int32_t keyLen, vo int32_t code = 0; void* batch = streamStateCreateBatch(); - code = streamStatePutBatch(pState, "default", batch, pKey, pVal, vLen); + code = streamStatePutBatch(pState, "default", batch, pKey, pVal, vLen, 0); if (code != 0) { return code; } diff --git a/source/libs/stream/src/tstreamFileState.c b/source/libs/stream/src/tstreamFileState.c index b7401ec5d9..67835e77b8 100644 --- a/source/libs/stream/src/tstreamFileState.c +++ b/source/libs/stream/src/tstreamFileState.c @@ -15,6 +15,7 @@ #include "tstreamFileState.h" +#include "query.h" #include "streamBackendRocksdb.h" #include "taos.h" #include "tcommon.h" @@ -154,9 +155,7 @@ void streamFileStateClear(SStreamFileState* pFileState) { clearExpiredRowBuff(pFileState, 0, true); } -bool needClearDiskBuff(SStreamFileState* pFileState) { - return pFileState->flushMark > 0; -} +bool needClearDiskBuff(SStreamFileState* pFileState) { return pFileState->flushMark > 0; } void popUsedBuffs(SStreamFileState* pFileState, SStreamSnapshot* pFlushList, uint64_t max, bool used) { uint64_t i = 0; @@ -325,7 +324,9 @@ bool hasRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen) { void releaseRowBuffPos(SRowBuffPos* pBuff) { pBuff->beUsed = false; } SStreamSnapshot* getSnapshot(SStreamFileState* pFileState) { - clearExpiredRowBuff(pFileState, pFileState->maxTs - pFileState->deleteMark, false); + int64_t mark = (INT64_MIN + pFileState->deleteMark >= pFileState->maxTs) ? INT64_MIN + : pFileState->maxTs - pFileState->deleteMark; + clearExpiredRowBuff(pFileState, mark, false); return pFileState->usedBuffs; } @@ -356,7 +357,7 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot, } SStateKey sKey = {.key = *((SWinKey*)pPos->pKey), .opNum = ((SStreamState*)pFileState->pFileStore)->number}; - code = streamStatePutBatch(pFileState->pFileStore, "state", batch, &sKey, pPos->pRowBuff, pFileState->rowSize); + code = streamStatePutBatch(pFileState->pFileStore, "state", batch, &sKey, pPos->pRowBuff, pFileState->rowSize, 0); qDebug("===stream===put %" PRId64 " to disc, res %d", sKey.key.ts, code); } if (streamStateGetBatchSize(batch) > 0) { @@ -372,7 +373,7 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot, int32_t len = 0; sprintf(keyBuf, "%s:%" PRId64 "", taskKey, ((SStreamState*)pFileState->pFileStore)->checkPointId); streamFileStateEncode(&pFileState->flushMark, &valBuf, &len); - code = streamStatePutBatch(pFileState->pFileStore, "default", batch, keyBuf, valBuf, len); + code = streamStatePutBatch(pFileState->pFileStore, "default", batch, keyBuf, valBuf, len, 0); taosMemoryFree(valBuf); } { @@ -381,7 +382,7 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot, int32_t len = 0; memcpy(keyBuf, taskKey, strlen(taskKey)); len = sprintf(valBuf, "%" PRId64 "", ((SStreamState*)pFileState->pFileStore)->checkPointId); - code = streamStatePutBatch(pFileState->pFileStore, "default", batch, keyBuf, valBuf, len); + code = streamStatePutBatch(pFileState->pFileStore, "default", batch, keyBuf, valBuf, len, 0); } streamStatePutBatch_rocksdb(pFileState->pFileStore, batch); } @@ -440,7 +441,9 @@ int32_t deleteExpiredCheckPoint(SStreamFileState* pFileState, TSKEY mark) { int32_t recoverSnapshot(SStreamFileState* pFileState) { int32_t code = TSDB_CODE_SUCCESS; - deleteExpiredCheckPoint(pFileState, pFileState->maxTs - pFileState->deleteMark); + int64_t mark = (INT64_MIN + pFileState->deleteMark >= pFileState->maxTs) ? INT64_MIN + : pFileState->maxTs - pFileState->deleteMark; + deleteExpiredCheckPoint(pFileState, mark); void* pStVal = NULL; int32_t len = 0; diff --git a/source/libs/stream/test/CMakeLists.txt b/source/libs/stream/test/CMakeLists.txt index 5a97ba45f6..a0c1717690 100644 --- a/source/libs/stream/test/CMakeLists.txt +++ b/source/libs/stream/test/CMakeLists.txt @@ -10,7 +10,7 @@ ADD_EXECUTABLE(streamUpdateTest "tstreamUpdateTest.cpp") TARGET_LINK_LIBRARIES( streamUpdateTest - PUBLIC os util common gtest stream + PUBLIC os util common gtest gtest_main stream ) TARGET_INCLUDE_DIRECTORIES( diff --git a/source/libs/stream/test/tstreamUpdateTest.cpp b/source/libs/stream/test/tstreamUpdateTest.cpp index c698187874..18c60aff28 100644 --- a/source/libs/stream/test/tstreamUpdateTest.cpp +++ b/source/libs/stream/test/tstreamUpdateTest.cpp @@ -1,11 +1,28 @@ #include +#include "streamBackendRocksdb.h" +#include "tstream.h" #include "tstreamUpdate.h" #include "ttime.h" using namespace std; #define MAX_NUM_SCALABLE_BF 100000 +class StreamStateEnv : public ::testing::Test { + protected: + virtual void SetUp() { + streamMetaInit(); + backend = streamBackendInit(path); + } + virtual void TearDown() { + streamMetaCleanup(); + // indexClose(index); + } + + const char *path = TD_TMP_DIR_PATH "stream"; + void *backend; +}; + bool equalSBF(SScalableBf *left, SScalableBf *right) { if (left->growth != right->growth) return false; if (left->numBits != right->numBits) return false; @@ -191,8 +208,9 @@ TEST(TD_STREAM_UPDATE_TEST, update) { // updateInfoDestroy(pSU6); // updateInfoDestroy(pSU7); } - -int main(int argc, char *argv[]) { - testing::InitGoogleTest(&argc, argv); - return RUN_ALL_TESTS(); -} \ No newline at end of file +// TEST() +TEST(StreamStateEnv, test1) {} +// int main(int argc, char *argv[]) { +// testing::InitGoogleTest(&argc, argv); +// return RUN_ALL_TESTS(); +// } \ No newline at end of file diff --git a/tests/script/tsim/alter/table.sim b/tests/script/tsim/alter/table.sim index db2a22205f..0cf291523a 100644 --- a/tests/script/tsim/alter/table.sim +++ b/tests/script/tsim/alter/table.sim @@ -657,36 +657,33 @@ if $data20 != null then return -1 endi -#print =============== error for normal table -#sql create table tb2023(ts timestamp, f int); -#sql_error alter table tb2023 add column v varchar(65535); -#sql_error alter table tb2023 add column v varchar(65535); -#sql_error alter table tb2023 add column v varchar(65530); -#sql alter table tb2023 add column v varchar(16374); -#sql_error alter table tb2023 modify column v varchar(65536); -#sql desc tb2023 -#sql alter table tb2023 drop column v -#sql_error alter table tb2023 add column v nchar(16384); -#sql alter table tb2023 add column v nchar(4093); -#sql_error alter table tb2023 modify column v nchar(16384); -#sql_error alter table tb2023 add column v nchar(16384); -#sql alter table tb2023 drop column v -#sql alter table tb2023 add column v nchar(16374); -#sql desc tb2023 -# -#print =============== error for super table -#sql create table stb2023(ts timestamp, f int) tags(t1 int); -#sql_error alter table stb2023 add column v varchar(65535); -#sql_error alter table stb2023 add column v varchar(65536); -#sql_error alter table stb2023 add column v varchar(33100); -#sql alter table stb2023 add column v varchar(16374); -#sql_error alter table stb2023 modify column v varchar(16375); -#sql desc stb2023 -#sql alter table stb2023 drop column v -#sql_error alter table stb2023 add column v nchar(4094); -#sql alter table stb2023 add column v nchar(4093); -#sql_error alter table stb2023 modify column v nchar(4094); -#sql desc stb2023 +print =============== error for normal table +sql create table tb2023(ts timestamp, f int); +sql_error alter table tb2023 add column v varchar(65518); +sql_error alter table tb2023 add column v varchar(65531); +sql_error alter table tb2023 add column v varchar(65535); +sql alter table tb2023 add column v varchar(65517); +sql_error alter table tb2023 modify column v varchar(65518); +sql desc tb2023 +sql alter table tb2023 drop column v +sql_error alter table tb2023 add column v nchar(16380); +sql alter table tb2023 add column v nchar(16379); +sql_error alter table tb2023 modify column v nchar(16380); +sql desc tb2023 + +print =============== error for super table +sql create table stb2023(ts timestamp, f int) tags(t1 int); +sql_error alter table stb2023 add column v varchar(65518); +sql_error alter table stb2023 add column v varchar(65531); +sql_error alter table stb2023 add column v varchar(65535); +sql alter table stb2023 add column v varchar(65517); +sql_error alter table stb2023 modify column v varchar(65518); +sql desc stb2023 +sql alter table stb2023 drop column v +sql_error alter table stb2023 add column v nchar(16380); +sql alter table stb2023 add column v nchar(16379); +sql_error alter table stb2023 modify column v nchar(16380); +sql desc stb2023 print ======= over sql drop database d1 diff --git a/tests/script/tsim/parser/alter_column.sim b/tests/script/tsim/parser/alter_column.sim index f892115735..2bf369b910 100644 --- a/tests/script/tsim/parser/alter_column.sim +++ b/tests/script/tsim/parser/alter_column.sim @@ -48,7 +48,7 @@ sql_error alter table tb modify column c2 binary(10); sql_error alter table tb modify column c2 binary(9); sql_error alter table tb modify column c2 binary(-9); sql_error alter table tb modify column c2 binary(0); -sql_error alter table tb modify column c2 binary(65600); +sql_error alter table tb modify column c2 binary(65436); sql_error alter table tb modify column c2 nchar(30); sql_error alter table tb modify column c3 double; sql_error alter table tb modify column c3 nchar(10);