Merge branch 'feat/TS-4243-3.0' of github.com:taosdata/TDengine into TEST/3.0/TS-4243
This commit is contained in:
commit
2b623d1952
|
@ -29,21 +29,7 @@ typedef struct SCorEpSet {
|
||||||
|
|
||||||
#define GET_ACTIVE_EP(_eps) (&((_eps)->eps[(_eps)->inUse]))
|
#define GET_ACTIVE_EP(_eps) (&((_eps)->eps[(_eps)->inUse]))
|
||||||
|
|
||||||
#define EPSET_TO_STR(_eps, tbuf) \
|
int32_t epsetToStr(const SEpSet* pEpSet, char* pBuf, int32_t len);
|
||||||
do { \
|
|
||||||
int len = snprintf((tbuf), sizeof(tbuf), "epset:{"); \
|
|
||||||
for (int _i = 0; _i < (_eps)->numOfEps; _i++) { \
|
|
||||||
if (_i == (_eps)->numOfEps - 1) { \
|
|
||||||
len += \
|
|
||||||
snprintf((tbuf) + len, sizeof(tbuf) - len, "%d. %s:%d", _i, (_eps)->eps[_i].fqdn, (_eps)->eps[_i].port); \
|
|
||||||
} else { \
|
|
||||||
len += \
|
|
||||||
snprintf((tbuf) + len, sizeof(tbuf) - len, "%d. %s:%d, ", _i, (_eps)->eps[_i].fqdn, (_eps)->eps[_i].port); \
|
|
||||||
} \
|
|
||||||
} \
|
|
||||||
len += snprintf((tbuf) + len, sizeof(tbuf) - len, "}, inUse:%d", (_eps)->inUse); \
|
|
||||||
} while (0);
|
|
||||||
|
|
||||||
int32_t taosGetFqdnPortFromEp(const char* ep, SEp* pEp);
|
int32_t taosGetFqdnPortFromEp(const char* ep, SEp* pEp);
|
||||||
void addEpIntoEpSet(SEpSet* pEpSet, const char* fqdn, uint16_t port);
|
void addEpIntoEpSet(SEpSet* pEpSet, const char* fqdn, uint16_t port);
|
||||||
|
|
||||||
|
|
|
@ -328,7 +328,7 @@ typedef struct SStateStore {
|
||||||
int32_t (*streamStateGetByPos)(SStreamState* pState, void* pos, void** pVal);
|
int32_t (*streamStateGetByPos)(SStreamState* pState, void* pos, void** pVal);
|
||||||
int32_t (*streamStateDel)(SStreamState* pState, const SWinKey* key);
|
int32_t (*streamStateDel)(SStreamState* pState, const SWinKey* key);
|
||||||
int32_t (*streamStateClear)(SStreamState* pState);
|
int32_t (*streamStateClear)(SStreamState* pState);
|
||||||
void (*streamStateSetNumber)(SStreamState* pState, int32_t number);
|
void (*streamStateSetNumber)(SStreamState* pState, int32_t number, int32_t tsIdex);
|
||||||
int32_t (*streamStateSaveInfo)(SStreamState* pState, void* pKey, int32_t keyLen, void* pVal, int32_t vLen);
|
int32_t (*streamStateSaveInfo)(SStreamState* pState, void* pKey, int32_t keyLen, void* pVal, int32_t vLen);
|
||||||
int32_t (*streamStateGetInfo)(SStreamState* pState, void* pKey, int32_t keyLen, void** pVal, int32_t* pLen);
|
int32_t (*streamStateGetInfo)(SStreamState* pState, void* pKey, int32_t keyLen, void** pVal, int32_t* pLen);
|
||||||
|
|
||||||
|
|
|
@ -29,6 +29,7 @@ struct SResultRowEntryInfo;
|
||||||
|
|
||||||
struct SFunctionNode;
|
struct SFunctionNode;
|
||||||
typedef struct SScalarParam SScalarParam;
|
typedef struct SScalarParam SScalarParam;
|
||||||
|
typedef struct SStreamState SStreamState;
|
||||||
|
|
||||||
typedef struct SFuncExecEnv {
|
typedef struct SFuncExecEnv {
|
||||||
int32_t calcMemSize;
|
int32_t calcMemSize;
|
||||||
|
@ -127,7 +128,7 @@ typedef struct SInputColumnInfoData {
|
||||||
typedef struct SSerializeDataHandle {
|
typedef struct SSerializeDataHandle {
|
||||||
struct SDiskbasedBuf *pBuf;
|
struct SDiskbasedBuf *pBuf;
|
||||||
int32_t currentPage;
|
int32_t currentPage;
|
||||||
void *pState;
|
SStreamState *pState;
|
||||||
} SSerializeDataHandle;
|
} SSerializeDataHandle;
|
||||||
|
|
||||||
// incremental state storage
|
// incremental state storage
|
||||||
|
@ -165,7 +166,7 @@ typedef struct STdbState {
|
||||||
void *txn;
|
void *txn;
|
||||||
} STdbState;
|
} STdbState;
|
||||||
|
|
||||||
typedef struct {
|
struct SStreamState {
|
||||||
STdbState *pTdbState;
|
STdbState *pTdbState;
|
||||||
struct SStreamFileState *pFileState;
|
struct SStreamFileState *pFileState;
|
||||||
int32_t number;
|
int32_t number;
|
||||||
|
@ -174,7 +175,8 @@ typedef struct {
|
||||||
int64_t streamId;
|
int64_t streamId;
|
||||||
int64_t streamBackendRid;
|
int64_t streamBackendRid;
|
||||||
int8_t dump;
|
int8_t dump;
|
||||||
} SStreamState;
|
int32_t tsIndex;
|
||||||
|
};
|
||||||
|
|
||||||
typedef struct SFunctionStateStore {
|
typedef struct SFunctionStateStore {
|
||||||
int32_t (*streamStateFuncPut)(SStreamState *pState, const SWinKey *key, const void *value, int32_t vLen);
|
int32_t (*streamStateFuncPut)(SStreamState *pState, const SWinKey *key, const void *value, int32_t vLen);
|
||||||
|
|
|
@ -46,7 +46,7 @@ bool streamStateCheck(SStreamState* pState, const SWinKey* key);
|
||||||
int32_t streamStateGetByPos(SStreamState* pState, void* pos, void** pVal);
|
int32_t streamStateGetByPos(SStreamState* pState, void* pos, void** pVal);
|
||||||
int32_t streamStateDel(SStreamState* pState, const SWinKey* key);
|
int32_t streamStateDel(SStreamState* pState, const SWinKey* key);
|
||||||
int32_t streamStateClear(SStreamState* pState);
|
int32_t streamStateClear(SStreamState* pState);
|
||||||
void streamStateSetNumber(SStreamState* pState, int32_t number);
|
void streamStateSetNumber(SStreamState* pState, int32_t number, int32_t tsIdex);
|
||||||
int32_t streamStateSaveInfo(SStreamState* pState, void* pKey, int32_t keyLen, void* pVal, int32_t vLen);
|
int32_t streamStateSaveInfo(SStreamState* pState, void* pKey, int32_t keyLen, void* pVal, int32_t vLen);
|
||||||
int32_t streamStateGetInfo(SStreamState* pState, void* pKey, int32_t keyLen, void** pVal, int32_t* pLen);
|
int32_t streamStateGetInfo(SStreamState* pState, void* pKey, int32_t keyLen, void** pVal, int32_t* pLen);
|
||||||
|
|
||||||
|
|
|
@ -516,7 +516,6 @@ typedef struct SStreamMeta {
|
||||||
TdThreadMutex backendMutex;
|
TdThreadMutex backendMutex;
|
||||||
SMetaHbInfo* pHbInfo;
|
SMetaHbInfo* pHbInfo;
|
||||||
STaskUpdateInfo updateInfo;
|
STaskUpdateInfo updateInfo;
|
||||||
SHashObj* pUpdateTaskSet;
|
|
||||||
int32_t numOfStreamTasks; // this value should be increased when a new task is added into the meta
|
int32_t numOfStreamTasks; // this value should be increased when a new task is added into the meta
|
||||||
int32_t numOfPausedTasks;
|
int32_t numOfPausedTasks;
|
||||||
int64_t rid;
|
int64_t rid;
|
||||||
|
|
|
@ -31,7 +31,6 @@ typedef struct SStreamFileState SStreamFileState;
|
||||||
typedef SList SStreamSnapshot;
|
typedef SList SStreamSnapshot;
|
||||||
|
|
||||||
typedef void* (*_state_buff_get_fn)(void* pRowBuff, const void* pKey, size_t keyLen);
|
typedef void* (*_state_buff_get_fn)(void* pRowBuff, const void* pKey, size_t keyLen);
|
||||||
typedef int32_t (*_state_buff_put_fn)(void* pRowBuff, const void* pKey, size_t keyLen, const void* data, size_t dataLen);
|
|
||||||
typedef int32_t (*_state_buff_remove_fn)(void* pRowBuff, const void* pKey, size_t keyLen);
|
typedef int32_t (*_state_buff_remove_fn)(void* pRowBuff, const void* pKey, size_t keyLen);
|
||||||
typedef int32_t (*_state_buff_remove_by_pos_fn)(SStreamFileState* pState, SRowBuffPos* pPos);
|
typedef int32_t (*_state_buff_remove_by_pos_fn)(SStreamFileState* pState, SRowBuffPos* pPos);
|
||||||
typedef void (*_state_buff_cleanup_fn)(void* pRowBuff);
|
typedef void (*_state_buff_cleanup_fn)(void* pRowBuff);
|
||||||
|
@ -41,6 +40,8 @@ typedef int32_t (*_state_file_remove_fn)(SStreamFileState* pFileState, const voi
|
||||||
typedef int32_t (*_state_file_get_fn)(SStreamFileState* pFileState, void* pKey, void* data, int32_t* pDataLen);
|
typedef int32_t (*_state_file_get_fn)(SStreamFileState* pFileState, void* pKey, void* data, int32_t* pDataLen);
|
||||||
typedef int32_t (*_state_file_clear_fn)(SStreamState* pState);
|
typedef int32_t (*_state_file_clear_fn)(SStreamState* pState);
|
||||||
|
|
||||||
|
typedef int32_t (*_state_fun_get_fn) (SStreamFileState* pFileState, void* pKey, int32_t keyLen, void** pVal, int32_t* pVLen);
|
||||||
|
|
||||||
typedef int32_t (*range_cmpr_fn)(const SSessionKey* pWin1, const SSessionKey* pWin2);
|
typedef int32_t (*range_cmpr_fn)(const SSessionKey* pWin1, const SSessionKey* pWin2);
|
||||||
|
|
||||||
SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_t rowSize, uint32_t selectRowSize,
|
SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_t rowSize, uint32_t selectRowSize,
|
||||||
|
@ -64,7 +65,7 @@ int32_t recoverSnapshot(SStreamFileState* pFileState, int64_t ckId);
|
||||||
|
|
||||||
int32_t getSnapshotIdList(SStreamFileState* pFileState, SArray* list);
|
int32_t getSnapshotIdList(SStreamFileState* pFileState, SArray* list);
|
||||||
int32_t deleteExpiredCheckPoint(SStreamFileState* pFileState, TSKEY mark);
|
int32_t deleteExpiredCheckPoint(SStreamFileState* pFileState, TSKEY mark);
|
||||||
int32_t streamFileStateGeSelectRowSize(SStreamFileState* pFileState);
|
int32_t streamFileStateGetSelectRowSize(SStreamFileState* pFileState);
|
||||||
void streamFileStateReloadInfo(SStreamFileState* pFileState, TSKEY ts);
|
void streamFileStateReloadInfo(SStreamFileState* pFileState, TSKEY ts);
|
||||||
|
|
||||||
void* getRowStateBuff(SStreamFileState* pFileState);
|
void* getRowStateBuff(SStreamFileState* pFileState);
|
||||||
|
@ -105,6 +106,10 @@ int32_t getStateWinResultBuff(SStreamFileState* pFileState, SSessionKey* key, ch
|
||||||
int32_t getCountWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, COUNT_TYPE winCount, void** pVal, int32_t* pVLen);
|
int32_t getCountWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, COUNT_TYPE winCount, void** pVal, int32_t* pVLen);
|
||||||
int32_t createCountWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, void** pVal, int32_t* pVLen);
|
int32_t createCountWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, void** pVal, int32_t* pVLen);
|
||||||
|
|
||||||
|
//function
|
||||||
|
int32_t getSessionRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, void** pVal, int32_t* pVLen);
|
||||||
|
int32_t getFunctionRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, void** pVal, int32_t* pVLen);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -22,10 +22,10 @@
|
||||||
extern "C" {
|
extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
int64_t taosStrHumanToInt64(const char* str);
|
int32_t taosStrHumanToInt64(const char* str, int64_t* out);
|
||||||
void taosInt64ToHumanStr(int64_t val, char* outStr);
|
void taosInt64ToHumanStr(int64_t val, char* outStr);
|
||||||
|
|
||||||
int32_t taosStrHumanToInt32(const char* str);
|
int32_t taosStrHumanToInt32(const char* str, int32_t* out);
|
||||||
void taosInt32ToHumanStr(int32_t val, char* outStr);
|
void taosInt32ToHumanStr(int32_t val, char* outStr);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
|
|
|
@ -56,6 +56,8 @@ void taosIpPort2String(uint32_t ip, uint16_t port, char *str);
|
||||||
|
|
||||||
void *tmemmem(const char *haystack, int hlen, const char *needle, int nlen);
|
void *tmemmem(const char *haystack, int hlen, const char *needle, int nlen);
|
||||||
|
|
||||||
|
int32_t parseCfgReal(const char* str, double* out);
|
||||||
|
|
||||||
static FORCE_INLINE void taosEncryptPass(uint8_t *inBuf, size_t inLen, char *target) {
|
static FORCE_INLINE void taosEncryptPass(uint8_t *inBuf, size_t inLen, char *target) {
|
||||||
T_MD5_CTX context;
|
T_MD5_CTX context;
|
||||||
tMD5Init(&context);
|
tMD5Init(&context);
|
||||||
|
|
|
@ -70,6 +70,7 @@ void epsetAssign(SEpSet* pDst, const SEpSet* pSrc) {
|
||||||
tstrncpy(pDst->eps[i].fqdn, pSrc->eps[i].fqdn, tListLen(pSrc->eps[i].fqdn));
|
tstrncpy(pDst->eps[i].fqdn, pSrc->eps[i].fqdn, tListLen(pSrc->eps[i].fqdn));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void epAssign(SEp* pDst, SEp* pSrc) {
|
void epAssign(SEp* pDst, SEp* pSrc) {
|
||||||
if (pSrc == NULL || pDst == NULL) {
|
if (pSrc == NULL || pDst == NULL) {
|
||||||
return;
|
return;
|
||||||
|
@ -78,6 +79,7 @@ void epAssign(SEp* pDst, SEp* pSrc) {
|
||||||
tstrncpy(pDst->fqdn, pSrc->fqdn, tListLen(pSrc->fqdn));
|
tstrncpy(pDst->fqdn, pSrc->fqdn, tListLen(pSrc->fqdn));
|
||||||
pDst->port = pSrc->port;
|
pDst->port = pSrc->port;
|
||||||
}
|
}
|
||||||
|
|
||||||
void epsetSort(SEpSet* pDst) {
|
void epsetSort(SEpSet* pDst) {
|
||||||
if (pDst->numOfEps <= 1) {
|
if (pDst->numOfEps <= 1) {
|
||||||
return;
|
return;
|
||||||
|
@ -127,6 +129,34 @@ SEpSet getEpSet_s(SCorEpSet* pEpSet) {
|
||||||
return ep;
|
return ep;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t epsetToStr(const SEpSet* pEpSet, char* pBuf, int32_t bufLen) {
|
||||||
|
int len = snprintf(pBuf, bufLen, "epset:{");
|
||||||
|
if (len < 0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int _i = 0; (_i < pEpSet->numOfEps) && (bufLen > len); _i++) {
|
||||||
|
int32_t ret = 0;
|
||||||
|
if (_i == pEpSet->numOfEps - 1) {
|
||||||
|
ret = snprintf(pBuf + len, bufLen - len, "%d. %s:%d", _i, pEpSet->eps[_i].fqdn, pEpSet->eps[_i].port);
|
||||||
|
} else {
|
||||||
|
ret = snprintf(pBuf + len, bufLen - len, "%d. %s:%d, ", _i, pEpSet->eps[_i].fqdn, pEpSet->eps[_i].port);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (ret < 0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
len += ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (len < bufLen) {
|
||||||
|
/*len += */snprintf(pBuf + len, bufLen - len, "}, inUse:%d", pEpSet->inUse);
|
||||||
|
}
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t taosGenCrashJsonMsg(int signum, char** pMsg, int64_t clusterId, int64_t startTime) {
|
int32_t taosGenCrashJsonMsg(int signum, char** pMsg, int64_t clusterId, int64_t startTime) {
|
||||||
SJson* pJson = tjsonCreateObject();
|
SJson* pJson = tjsonCreateObject();
|
||||||
if (pJson == NULL) return -1;
|
if (pJson == NULL) return -1;
|
||||||
|
|
|
@ -1788,7 +1788,8 @@ static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pP
|
||||||
const SEp *pPrevEp = GET_ACTIVE_EP(&pPrevEntry->epset);
|
const SEp *pPrevEp = GET_ACTIVE_EP(&pPrevEntry->epset);
|
||||||
|
|
||||||
char buf[256] = {0};
|
char buf[256] = {0};
|
||||||
EPSET_TO_STR(&pCurrent->epset, buf);
|
epsetToStr(&pCurrent->epset, buf, tListLen(buf));
|
||||||
|
|
||||||
mDebug("nodeId:%d restart/epset changed detected, old:%s:%d -> new:%s, stageUpdate:%d", pCurrent->nodeId,
|
mDebug("nodeId:%d restart/epset changed detected, old:%s:%d -> new:%s, stageUpdate:%d", pCurrent->nodeId,
|
||||||
pPrevEp->fqdn, pPrevEp->port, buf, pPrevEntry->stageUpdated);
|
pPrevEp->fqdn, pPrevEp->port, buf, pPrevEntry->stageUpdated);
|
||||||
|
|
||||||
|
@ -1939,7 +1940,7 @@ static SArray *extractNodeListFromStream(SMnode *pMnode) {
|
||||||
taosArrayPush(plist, pEntry);
|
taosArrayPush(plist, pEntry);
|
||||||
|
|
||||||
char buf[256] = {0};
|
char buf[256] = {0};
|
||||||
EPSET_TO_STR(&pEntry->epset, buf);
|
epsetToStr(&pEntry->epset, buf, tListLen(buf));
|
||||||
mDebug("extract nodeInfo from stream obj, nodeId:%d, %s", pEntry->nodeId, buf);
|
mDebug("extract nodeInfo from stream obj, nodeId:%d, %s", pEntry->nodeId, buf);
|
||||||
}
|
}
|
||||||
taosHashCleanup(pHash);
|
taosHashCleanup(pHash);
|
||||||
|
|
|
@ -114,7 +114,7 @@ SArray *mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady) {
|
||||||
}
|
}
|
||||||
|
|
||||||
char buf[256] = {0};
|
char buf[256] = {0};
|
||||||
EPSET_TO_STR(&entry.epset, buf);
|
epsetToStr(&entry.epset, buf, tListLen(buf));
|
||||||
|
|
||||||
mDebug("take node snapshot, nodeId:%d %s", entry.nodeId, buf);
|
mDebug("take node snapshot, nodeId:%d %s", entry.nodeId, buf);
|
||||||
taosArrayPush(pVgroupListSnapshot, &entry);
|
taosArrayPush(pVgroupListSnapshot, &entry);
|
||||||
|
@ -133,7 +133,7 @@ SArray *mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady) {
|
||||||
entry.nodeId = SNODE_HANDLE;
|
entry.nodeId = SNODE_HANDLE;
|
||||||
|
|
||||||
char buf[256] = {0};
|
char buf[256] = {0};
|
||||||
EPSET_TO_STR(&entry.epset, buf);
|
epsetToStr(&entry.epset, buf, tListLen(buf));
|
||||||
mDebug("take snode snapshot, nodeId:%d %s", entry.nodeId, buf);
|
mDebug("take snode snapshot, nodeId:%d %s", entry.nodeId, buf);
|
||||||
taosArrayPush(pVgroupListSnapshot, &entry);
|
taosArrayPush(pVgroupListSnapshot, &entry);
|
||||||
sdbRelease(pSdb, pObj);
|
sdbRelease(pSdb, pObj);
|
||||||
|
@ -302,7 +302,7 @@ static int32_t doSetPauseAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTa
|
||||||
}
|
}
|
||||||
|
|
||||||
char buf[256] = {0};
|
char buf[256] = {0};
|
||||||
EPSET_TO_STR(&epset, buf);
|
epsetToStr(&epset, buf, tListLen(buf));
|
||||||
mDebug("pause stream task in node:%d, epset:%s", pTask->info.nodeId, buf);
|
mDebug("pause stream task in node:%d, epset:%s", pTask->info.nodeId, buf);
|
||||||
|
|
||||||
code = setTransAction(pTrans, pReq, sizeof(SVPauseStreamTaskReq), TDMT_STREAM_TASK_PAUSE, &epset, 0);
|
code = setTransAction(pTrans, pReq, sizeof(SVPauseStreamTaskReq), TDMT_STREAM_TASK_PAUSE, &epset, 0);
|
||||||
|
|
|
@ -30,11 +30,11 @@
|
||||||
extern "C" {
|
extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
typedef struct SSnode {
|
struct SSnode {
|
||||||
char* path;
|
char* path;
|
||||||
SStreamMeta* pMeta;
|
SStreamMeta* pMeta;
|
||||||
SMsgCb msgCb;
|
SMsgCb msgCb;
|
||||||
} SSnode;
|
};
|
||||||
|
|
||||||
#if 0
|
#if 0
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
|
|
@ -501,6 +501,10 @@ int32_t tqGetStreamExecInfo(SVnode* pVnode, int64_t streamId, int64_t* pDelay, b
|
||||||
}
|
}
|
||||||
|
|
||||||
// extract the required source task for a given stream, identified by streamId
|
// extract the required source task for a given stream, identified by streamId
|
||||||
|
streamMetaRLock(pMeta);
|
||||||
|
|
||||||
|
numOfTasks = taosArrayGetSize(pMeta->pTaskList);
|
||||||
|
|
||||||
for (int32_t i = 0; i < numOfTasks; ++i) {
|
for (int32_t i = 0; i < numOfTasks; ++i) {
|
||||||
STaskId* pId = taosArrayGet(pMeta->pTaskList, i);
|
STaskId* pId = taosArrayGet(pMeta->pTaskList, i);
|
||||||
if (pId->streamId != streamId) {
|
if (pId->streamId != streamId) {
|
||||||
|
@ -552,5 +556,7 @@ int32_t tqGetStreamExecInfo(SVnode* pVnode, int64_t streamId, int64_t* pDelay, b
|
||||||
walCloseReader(pReader);
|
walCloseReader(pReader);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
streamMetaRUnLock(pMeta);
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
|
@ -807,6 +807,7 @@ int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLead
|
||||||
int32_t tqStartTaskCompleteCallback(SStreamMeta* pMeta) {
|
int32_t tqStartTaskCompleteCallback(SStreamMeta* pMeta) {
|
||||||
STaskStartInfo* pStartInfo = &pMeta->startInfo;
|
STaskStartInfo* pStartInfo = &pMeta->startInfo;
|
||||||
int32_t vgId = pMeta->vgId;
|
int32_t vgId = pMeta->vgId;
|
||||||
|
bool scanWal = false;
|
||||||
|
|
||||||
streamMetaWLock(pMeta);
|
streamMetaWLock(pMeta);
|
||||||
if (pStartInfo->taskStarting == 1) {
|
if (pStartInfo->taskStarting == 1) {
|
||||||
|
@ -831,10 +832,18 @@ int32_t tqStartTaskCompleteCallback(SStreamMeta* pMeta) {
|
||||||
pStartInfo->restartCount = 0;
|
pStartInfo->restartCount = 0;
|
||||||
tqDebug("vgId:%d all tasks are ready, reset restartCounter 0, not restart tasks", vgId);
|
tqDebug("vgId:%d all tasks are ready, reset restartCounter 0, not restart tasks", vgId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
scanWal = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
streamMetaWUnLock(pMeta);
|
streamMetaWUnLock(pMeta);
|
||||||
|
|
||||||
|
if (scanWal && (vgId != SNODE_HANDLE)) {
|
||||||
|
tqDebug("vgId:%d start scan wal for executing tasks", vgId);
|
||||||
|
tqScanWalAsync(pMeta->ahandle, true);
|
||||||
|
}
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -399,28 +399,31 @@ static int32_t loadSttStatisticsBlockData(SSttFileReader *pSttFileReader, SSttBl
|
||||||
rows - i);
|
rows - i);
|
||||||
taosArrayAddBatch(pBlockLoadInfo->info.pCount, tBufferGetDataAt(&block.counts, i * sizeof(int64_t)), rows - i);
|
taosArrayAddBatch(pBlockLoadInfo->info.pCount, tBufferGetDataAt(&block.counts, i * sizeof(int64_t)), rows - i);
|
||||||
|
|
||||||
SValue vFirst = {0}, vLast = {0};
|
if (block.numOfPKs > 0) {
|
||||||
for (int32_t f = i; f < rows; ++f) {
|
SValue vFirst = {0}, vLast = {0};
|
||||||
int32_t code = tValueColumnGet(&block.firstKeyPKs[0], f, &vFirst);
|
for (int32_t f = i; f < rows; ++f) {
|
||||||
if (code) {
|
int32_t code = tValueColumnGet(&block.firstKeyPKs[0], f, &vFirst);
|
||||||
break;
|
if (code) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
tValueDupPayload(&vFirst);
|
||||||
|
taosArrayPush(pBlockLoadInfo->info.pFirstKey, &vFirst);
|
||||||
|
|
||||||
|
// todo add api to clone the original data
|
||||||
|
code = tValueColumnGet(&block.lastKeyPKs[0], f, &vLast);
|
||||||
|
if (code) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
tValueDupPayload(&vLast);
|
||||||
|
taosArrayPush(pBlockLoadInfo->info.pLastKey, &vLast);
|
||||||
}
|
}
|
||||||
|
|
||||||
tValueDupPayload(&vFirst);
|
|
||||||
taosArrayPush(pBlockLoadInfo->info.pFirstKey, &vFirst);
|
|
||||||
|
|
||||||
// todo add api to clone the original data
|
|
||||||
code = tValueColumnGet(&block.lastKeyPKs[0], f, &vLast);
|
|
||||||
if (code) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
tValueDupPayload(&vLast);
|
|
||||||
taosArrayPush(pBlockLoadInfo->info.pLastKey, &vLast);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
STbStatisRecord record;
|
STbStatisRecord record = {0};
|
||||||
|
|
||||||
while (i < rows) {
|
while (i < rows) {
|
||||||
tStatisBlockGet(&block, i, &record);
|
tStatisBlockGet(&block, i, &record);
|
||||||
if (record.suid != suid) {
|
if (record.suid != suid) {
|
||||||
|
@ -433,15 +436,18 @@ static int32_t loadSttStatisticsBlockData(SSttFileReader *pSttFileReader, SSttBl
|
||||||
taosArrayPush(pBlockLoadInfo->info.pFirstTs, &record.firstKey.ts);
|
taosArrayPush(pBlockLoadInfo->info.pFirstTs, &record.firstKey.ts);
|
||||||
taosArrayPush(pBlockLoadInfo->info.pLastTs, &record.lastKey.ts);
|
taosArrayPush(pBlockLoadInfo->info.pLastTs, &record.lastKey.ts);
|
||||||
|
|
||||||
SValue s = record.firstKey.pks[0];
|
if (record.firstKey.numOfPKs > 0) {
|
||||||
tValueDupPayload(&s);
|
SValue s = record.firstKey.pks[0];
|
||||||
|
tValueDupPayload(&s);
|
||||||
|
|
||||||
taosArrayPush(pBlockLoadInfo->info.pFirstKey, &s);
|
taosArrayPush(pBlockLoadInfo->info.pFirstKey, &s);
|
||||||
|
|
||||||
s = record.lastKey.pks[0];
|
s = record.lastKey.pks[0];
|
||||||
tValueDupPayload(&s);
|
tValueDupPayload(&s);
|
||||||
|
|
||||||
|
taosArrayPush(pBlockLoadInfo->info.pLastKey, &s);
|
||||||
|
}
|
||||||
|
|
||||||
taosArrayPush(pBlockLoadInfo->info.pLastKey, &s);
|
|
||||||
i += 1;
|
i += 1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -909,7 +909,7 @@ int32_t initBasicInfoEx(SOptrBasicInfo* pBasicInfo, SExprSupp* pSup, SExprInfo*
|
||||||
int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SExprSupp* pExpSup, int32_t numOfOutput, int64_t gap,
|
int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SExprSupp* pExpSup, int32_t numOfOutput, int64_t gap,
|
||||||
SStreamState* pState, int32_t keySize, int16_t keyType, SStateStore* pStore,
|
SStreamState* pState, int32_t keySize, int16_t keyType, SStateStore* pStore,
|
||||||
SReadHandle* pHandle, STimeWindowAggSupp* pTwAggSup, const char* taskIdStr,
|
SReadHandle* pHandle, STimeWindowAggSupp* pTwAggSup, const char* taskIdStr,
|
||||||
SStorageAPI* pApi);
|
SStorageAPI* pApi, int32_t tsIndex);
|
||||||
void initDownStream(struct SOperatorInfo* downstream, SStreamAggSupporter* pAggSup, uint16_t type, int32_t tsColIndex,
|
void initDownStream(struct SOperatorInfo* downstream, SStreamAggSupporter* pAggSup, uint16_t type, int32_t tsColIndex,
|
||||||
STimeWindowAggSupp* pTwSup);
|
STimeWindowAggSupp* pTwSup);
|
||||||
void getMaxTsWins(const SArray* pAllWins, SArray* pMaxWins);
|
void getMaxTsWins(const SArray* pAllWins, SArray* pMaxWins);
|
||||||
|
|
|
@ -3091,7 +3091,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
|
||||||
pInfo->pUpdateInfo = NULL;
|
pInfo->pUpdateInfo = NULL;
|
||||||
pInfo->pTableScanOp = pTableScanOp;
|
pInfo->pTableScanOp = pTableScanOp;
|
||||||
if (pInfo->pTableScanOp->pTaskInfo->streamInfo.pState) {
|
if (pInfo->pTableScanOp->pTaskInfo->streamInfo.pState) {
|
||||||
pAPI->stateStore.streamStateSetNumber(pInfo->pTableScanOp->pTaskInfo->streamInfo.pState, -1);
|
pAPI->stateStore.streamStateSetNumber(pInfo->pTableScanOp->pTaskInfo->streamInfo.pState, -1, pInfo->primaryTsIndex);
|
||||||
}
|
}
|
||||||
|
|
||||||
pInfo->readHandle = *pHandle;
|
pInfo->readHandle = *pHandle;
|
||||||
|
|
|
@ -671,9 +671,10 @@ SOperatorInfo* createStreamCountAggOperatorInfo(SOperatorInfo* downstream, SPhys
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pInfo->primaryTsIndex = ((SColumnNode*)pCountNode->window.pTspk)->slotId;
|
||||||
code = initStreamAggSupporter(&pInfo->streamAggSup, pExpSup, numOfCols, 0,
|
code = initStreamAggSupporter(&pInfo->streamAggSup, pExpSup, numOfCols, 0,
|
||||||
pTaskInfo->streamInfo.pState, sizeof(COUNT_TYPE), 0, &pTaskInfo->storageAPI.stateStore, pHandle,
|
pTaskInfo->streamInfo.pState, sizeof(COUNT_TYPE), 0, &pTaskInfo->storageAPI.stateStore, pHandle,
|
||||||
&pInfo->twAggSup, GET_TASKID(pTaskInfo), &pTaskInfo->storageAPI);
|
&pInfo->twAggSup, GET_TASKID(pTaskInfo), &pTaskInfo->storageAPI, pInfo->primaryTsIndex);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
|
@ -689,8 +690,6 @@ SOperatorInfo* createStreamCountAggOperatorInfo(SOperatorInfo* downstream, SPhys
|
||||||
|
|
||||||
initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
|
initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
|
||||||
|
|
||||||
pInfo->primaryTsIndex = ((SColumnNode*)pCountNode->window.pTspk)->slotId;
|
|
||||||
|
|
||||||
pInfo->binfo.pRes = pResBlock;
|
pInfo->binfo.pRes = pResBlock;
|
||||||
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
||||||
pInfo->pStDeleted = tSimpleHashInit(64, hashFn);
|
pInfo->pStDeleted = tSimpleHashInit(64, hashFn);
|
||||||
|
|
|
@ -722,14 +722,14 @@ SOperatorInfo* createStreamEventAggOperatorInfo(SOperatorInfo* downstream, SPhys
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pInfo->primaryTsIndex = tsSlotId;
|
||||||
code = initStreamAggSupporter(&pInfo->streamAggSup, pExpSup, numOfCols, 0, pTaskInfo->streamInfo.pState,
|
code = initStreamAggSupporter(&pInfo->streamAggSup, pExpSup, numOfCols, 0, pTaskInfo->streamInfo.pState,
|
||||||
sizeof(bool) + sizeof(bool), 0, &pTaskInfo->storageAPI.stateStore, pHandle,
|
sizeof(bool) + sizeof(bool), 0, &pTaskInfo->storageAPI.stateStore, pHandle,
|
||||||
&pInfo->twAggSup, GET_TASKID(pTaskInfo), &pTaskInfo->storageAPI);
|
&pInfo->twAggSup, GET_TASKID(pTaskInfo), &pTaskInfo->storageAPI, pInfo->primaryTsIndex);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
|
|
||||||
pInfo->primaryTsIndex = tsSlotId;
|
|
||||||
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
||||||
pInfo->pSeDeleted = tSimpleHashInit(64, hashFn);
|
pInfo->pSeDeleted = tSimpleHashInit(64, hashFn);
|
||||||
pInfo->pDelIterator = NULL;
|
pInfo->pDelIterator = NULL;
|
||||||
|
|
|
@ -1556,7 +1556,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
|
||||||
|
|
||||||
qInfo("copy state %p to %p", pTaskInfo->streamInfo.pState, pInfo->pState);
|
qInfo("copy state %p to %p", pTaskInfo->streamInfo.pState, pInfo->pState);
|
||||||
|
|
||||||
pAPI->stateStore.streamStateSetNumber(pInfo->pState, -1);
|
pAPI->stateStore.streamStateSetNumber(pInfo->pState, -1, pInfo->primaryTsIndex);
|
||||||
code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str,
|
code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str,
|
||||||
pInfo->pState, &pTaskInfo->storageAPI.functionStore);
|
pInfo->pState, &pTaskInfo->storageAPI.functionStore);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
@ -1742,7 +1742,7 @@ static TSKEY sesionTs(void* pKey) {
|
||||||
int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SExprSupp* pExpSup, int32_t numOfOutput, int64_t gap,
|
int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SExprSupp* pExpSup, int32_t numOfOutput, int64_t gap,
|
||||||
SStreamState* pState, int32_t keySize, int16_t keyType, SStateStore* pStore,
|
SStreamState* pState, int32_t keySize, int16_t keyType, SStateStore* pStore,
|
||||||
SReadHandle* pHandle, STimeWindowAggSupp* pTwAggSup, const char* taskIdStr,
|
SReadHandle* pHandle, STimeWindowAggSupp* pTwAggSup, const char* taskIdStr,
|
||||||
SStorageAPI* pApi) {
|
SStorageAPI* pApi, int32_t tsIndex) {
|
||||||
pSup->resultRowSize = keySize + getResultRowSize(pExpSup->pCtx, numOfOutput);
|
pSup->resultRowSize = keySize + getResultRowSize(pExpSup->pCtx, numOfOutput);
|
||||||
pSup->pScanBlock = createSpecialDataBlock(STREAM_CLEAR);
|
pSup->pScanBlock = createSpecialDataBlock(STREAM_CLEAR);
|
||||||
pSup->gap = gap;
|
pSup->gap = gap;
|
||||||
|
@ -1758,7 +1758,7 @@ int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SExprSupp* pExpSup, in
|
||||||
initDummyFunction(pSup->pDummyCtx, pExpSup->pCtx, numOfOutput);
|
initDummyFunction(pSup->pDummyCtx, pExpSup->pCtx, numOfOutput);
|
||||||
pSup->pState = taosMemoryCalloc(1, sizeof(SStreamState));
|
pSup->pState = taosMemoryCalloc(1, sizeof(SStreamState));
|
||||||
*(pSup->pState) = *pState;
|
*(pSup->pState) = *pState;
|
||||||
pSup->stateStore.streamStateSetNumber(pSup->pState, -1);
|
pSup->stateStore.streamStateSetNumber(pSup->pState, -1, tsIndex);
|
||||||
int32_t funResSize = getMaxFunResSize(pExpSup, numOfOutput);
|
int32_t funResSize = getMaxFunResSize(pExpSup, numOfOutput);
|
||||||
pSup->pState->pFileState = pSup->stateStore.streamFileStateInit(
|
pSup->pState->pFileState = pSup->stateStore.streamFileStateInit(
|
||||||
tsStreamBufferSize, sizeof(SSessionKey), pSup->resultRowSize, funResSize, sesionTs, pSup->pState,
|
tsStreamBufferSize, sizeof(SSessionKey), pSup->resultRowSize, funResSize, sesionTs, pSup->pState,
|
||||||
|
@ -1767,25 +1767,8 @@ int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SExprSupp* pExpSup, in
|
||||||
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
||||||
pSup->pResultRows = tSimpleHashInit(32, hashFn);
|
pSup->pResultRows = tSimpleHashInit(32, hashFn);
|
||||||
|
|
||||||
int32_t pageSize = 4096;
|
|
||||||
while (pageSize < pSup->resultRowSize * 4) {
|
|
||||||
pageSize <<= 1u;
|
|
||||||
}
|
|
||||||
// at least four pages need to be in buffer
|
|
||||||
int32_t bufSize = 4096 * 256;
|
|
||||||
if (bufSize <= pageSize) {
|
|
||||||
bufSize = pageSize * 4;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!osTempSpaceAvailable()) {
|
|
||||||
terrno = TSDB_CODE_NO_DISKSPACE;
|
|
||||||
qError("Init stream agg supporter failed since %s, tempDir:%s", terrstr(), tsTempDir);
|
|
||||||
return terrno;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t code = createDiskbasedBuf(&pSup->pResultBuf, pageSize, bufSize, "function", tsTempDir);
|
|
||||||
for (int32_t i = 0; i < numOfOutput; ++i) {
|
for (int32_t i = 0; i < numOfOutput; ++i) {
|
||||||
pExpSup->pCtx[i].saveHandle.pBuf = pSup->pResultBuf;
|
pExpSup->pCtx[i].saveHandle.pState = pSup->pState;
|
||||||
}
|
}
|
||||||
|
|
||||||
pSup->pSessionAPI = pApi;
|
pSup->pSessionAPI = pApi;
|
||||||
|
@ -3008,16 +2991,16 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh
|
||||||
.deleteMark = getDeleteMark(&pSessionNode->window, 0),
|
.deleteMark = getDeleteMark(&pSessionNode->window, 0),
|
||||||
};
|
};
|
||||||
|
|
||||||
|
pInfo->primaryTsIndex = ((SColumnNode*)pSessionNode->window.pTspk)->slotId;
|
||||||
code = initStreamAggSupporter(&pInfo->streamAggSup, pExpSup, numOfCols, pSessionNode->gap,
|
code = initStreamAggSupporter(&pInfo->streamAggSup, pExpSup, numOfCols, pSessionNode->gap,
|
||||||
pTaskInfo->streamInfo.pState, 0, 0, &pTaskInfo->storageAPI.stateStore, pHandle,
|
pTaskInfo->streamInfo.pState, 0, 0, &pTaskInfo->storageAPI.stateStore, pHandle,
|
||||||
&pInfo->twAggSup, GET_TASKID(pTaskInfo), &pTaskInfo->storageAPI);
|
&pInfo->twAggSup, GET_TASKID(pTaskInfo), &pTaskInfo->storageAPI, pInfo->primaryTsIndex);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
|
|
||||||
initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
|
initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
|
||||||
|
|
||||||
pInfo->primaryTsIndex = ((SColumnNode*)pSessionNode->window.pTspk)->slotId;
|
|
||||||
if (pSessionNode->window.pTsEnd) {
|
if (pSessionNode->window.pTsEnd) {
|
||||||
pInfo->endTsIndex = ((SColumnNode*)pSessionNode->window.pTsEnd)->slotId;
|
pInfo->endTsIndex = ((SColumnNode*)pSessionNode->window.pTsEnd)->slotId;
|
||||||
}
|
}
|
||||||
|
@ -3240,7 +3223,7 @@ SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream
|
||||||
}
|
}
|
||||||
SStreamSessionAggOperatorInfo* pChInfo = pChildOp->info;
|
SStreamSessionAggOperatorInfo* pChInfo = pChildOp->info;
|
||||||
pChInfo->twAggSup.calTrigger = STREAM_TRIGGER_AT_ONCE;
|
pChInfo->twAggSup.calTrigger = STREAM_TRIGGER_AT_ONCE;
|
||||||
pAPI->stateStore.streamStateSetNumber(pChInfo->streamAggSup.pState, i);
|
pAPI->stateStore.streamStateSetNumber(pChInfo->streamAggSup.pState, i, pInfo->primaryTsIndex);
|
||||||
taosArrayPush(pInfo->pChildren, &pChildOp);
|
taosArrayPush(pInfo->pChildren, &pChildOp);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -3917,14 +3900,13 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys
|
||||||
}
|
}
|
||||||
int32_t keySize = sizeof(SStateKeys) + pColNode->node.resType.bytes;
|
int32_t keySize = sizeof(SStateKeys) + pColNode->node.resType.bytes;
|
||||||
int16_t type = pColNode->node.resType.type;
|
int16_t type = pColNode->node.resType.type;
|
||||||
|
pInfo->primaryTsIndex = tsSlotId;
|
||||||
code = initStreamAggSupporter(&pInfo->streamAggSup, pExpSup, numOfCols, 0, pTaskInfo->streamInfo.pState, keySize,
|
code = initStreamAggSupporter(&pInfo->streamAggSup, pExpSup, numOfCols, 0, pTaskInfo->streamInfo.pState, keySize,
|
||||||
type, &pTaskInfo->storageAPI.stateStore, pHandle, &pInfo->twAggSup,
|
type, &pTaskInfo->storageAPI.stateStore, pHandle, &pInfo->twAggSup,
|
||||||
GET_TASKID(pTaskInfo), &pTaskInfo->storageAPI);
|
GET_TASKID(pTaskInfo), &pTaskInfo->storageAPI, pInfo->primaryTsIndex);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
|
|
||||||
pInfo->primaryTsIndex = tsSlotId;
|
|
||||||
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
||||||
pInfo->pSeDeleted = tSimpleHashInit(64, hashFn);
|
pInfo->pSeDeleted = tSimpleHashInit(64, hashFn);
|
||||||
pInfo->pDelIterator = NULL;
|
pInfo->pDelIterator = NULL;
|
||||||
|
@ -4161,7 +4143,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys
|
||||||
|
|
||||||
pInfo->pState = taosMemoryCalloc(1, sizeof(SStreamState));
|
pInfo->pState = taosMemoryCalloc(1, sizeof(SStreamState));
|
||||||
*(pInfo->pState) = *(pTaskInfo->streamInfo.pState);
|
*(pInfo->pState) = *(pTaskInfo->streamInfo.pState);
|
||||||
pAPI->stateStore.streamStateSetNumber(pInfo->pState, -1);
|
pAPI->stateStore.streamStateSetNumber(pInfo->pState, -1, pInfo->primaryTsIndex);
|
||||||
|
|
||||||
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
|
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
|
||||||
code = initAggSup(pSup, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str, pInfo->pState,
|
code = initAggSup(pSup, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str, pInfo->pState,
|
||||||
|
|
|
@ -3600,10 +3600,7 @@ int32_t saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock*
|
||||||
|
|
||||||
SWinKey key = {0};
|
SWinKey key = {0};
|
||||||
if (pCtx->saveHandle.pBuf == NULL) {
|
if (pCtx->saveHandle.pBuf == NULL) {
|
||||||
SColumnInfoData* pColInfo = pCtx->input.pPTS;
|
SColumnInfoData* pColInfo = taosArrayGet(pSrcBlock->pDataBlock, pCtx->saveHandle.pState->tsIndex);
|
||||||
if (!pColInfo || pColInfo->info.type != TSDB_DATA_TYPE_TIMESTAMP) {
|
|
||||||
pColInfo = taosArrayGet(pSrcBlock->pDataBlock, 0);
|
|
||||||
}
|
|
||||||
ASSERT(pColInfo->info.type == TSDB_DATA_TYPE_TIMESTAMP);
|
ASSERT(pColInfo->info.type == TSDB_DATA_TYPE_TIMESTAMP);
|
||||||
key.groupId = pSrcBlock->info.id.groupId;
|
key.groupId = pSrcBlock->info.id.groupId;
|
||||||
key.ts = *(int64_t*)colDataGetData(pColInfo, rowIndex);;
|
key.ts = *(int64_t*)colDataGetData(pColInfo, rowIndex);;
|
||||||
|
|
|
@ -304,7 +304,6 @@ void streamMetaRemoveDB(void* arg, char* key) {
|
||||||
|
|
||||||
SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId, int64_t stage,
|
SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId, int64_t stage,
|
||||||
startComplete_fn_t fn) {
|
startComplete_fn_t fn) {
|
||||||
int32_t code = -1;
|
|
||||||
SStreamMeta* pMeta = taosMemoryCalloc(1, sizeof(SStreamMeta));
|
SStreamMeta* pMeta = taosMemoryCalloc(1, sizeof(SStreamMeta));
|
||||||
if (pMeta == NULL) {
|
if (pMeta == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
@ -516,7 +515,6 @@ void streamMetaCloseImpl(void* arg) {
|
||||||
|
|
||||||
taosHashCleanup(pMeta->pTasksMap);
|
taosHashCleanup(pMeta->pTasksMap);
|
||||||
taosHashCleanup(pMeta->pTaskDbUnique);
|
taosHashCleanup(pMeta->pTaskDbUnique);
|
||||||
taosHashCleanup(pMeta->pUpdateTaskSet);
|
|
||||||
taosHashCleanup(pMeta->updateInfo.pTasks);
|
taosHashCleanup(pMeta->updateInfo.pTasks);
|
||||||
taosHashCleanup(pMeta->startInfo.pReadyTaskSet);
|
taosHashCleanup(pMeta->startInfo.pReadyTaskSet);
|
||||||
taosHashCleanup(pMeta->startInfo.pFailedTaskSet);
|
taosHashCleanup(pMeta->startInfo.pFailedTaskSet);
|
||||||
|
|
|
@ -202,6 +202,13 @@ _end:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t getSessionRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, void** pVal, int32_t* pVLen) {
|
||||||
|
SWinKey* pTmpkey = pKey;
|
||||||
|
ASSERT(keyLen == sizeof(SWinKey));
|
||||||
|
SSessionKey pWinKey = {.groupId = pTmpkey->groupId, .win.skey = pTmpkey->ts, .win.ekey = pTmpkey->ts};
|
||||||
|
return getSessionWinResultBuff(pFileState, &pWinKey, 0, pVal, pVLen);
|
||||||
|
}
|
||||||
|
|
||||||
int32_t putSessionWinResultBuff(SStreamFileState* pFileState, SRowBuffPos* pPos) {
|
int32_t putSessionWinResultBuff(SStreamFileState* pFileState, SRowBuffPos* pPos) {
|
||||||
SSHashObj* pSessionBuff = getRowStateBuff(pFileState);
|
SSHashObj* pSessionBuff = getRowStateBuff(pFileState);
|
||||||
SSessionKey* pKey = pPos->pKey;
|
SSessionKey* pKey = pPos->pKey;
|
||||||
|
|
|
@ -398,8 +398,7 @@ void doProcessDownstreamReadyRsp(SStreamTask* pTask) {
|
||||||
if (pTask->status.taskStatus == TASK_STATUS__HALT) {
|
if (pTask->status.taskStatus == TASK_STATUS__HALT) {
|
||||||
ASSERT(HAS_RELATED_FILLHISTORY_TASK(pTask) && (pTask->info.fillHistory == 0));
|
ASSERT(HAS_RELATED_FILLHISTORY_TASK(pTask) && (pTask->info.fillHistory == 0));
|
||||||
|
|
||||||
// halt it self for count window stream task until the related
|
// halt it self for count window stream task until the related fill history task completed.
|
||||||
// fill history task completd.
|
|
||||||
stDebug("s-task:%s level:%d initial status is %s from mnode, set it to be halt", pTask->id.idStr,
|
stDebug("s-task:%s level:%d initial status is %s from mnode, set it to be halt", pTask->id.idStr,
|
||||||
pTask->info.taskLevel, streamTaskGetStatusStr(pTask->status.taskStatus));
|
pTask->info.taskLevel, streamTaskGetStatusStr(pTask->status.taskStatus));
|
||||||
streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_HALT);
|
streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_HALT);
|
||||||
|
|
|
@ -277,10 +277,10 @@ int32_t streamStateCommit(SStreamState* pState) {
|
||||||
int32_t streamStateFuncPut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen) {
|
int32_t streamStateFuncPut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen) {
|
||||||
#ifdef USE_ROCKSDB
|
#ifdef USE_ROCKSDB
|
||||||
void* pVal = NULL;
|
void* pVal = NULL;
|
||||||
int32_t len = 0;
|
int32_t len = getRowStateRowSize(pState->pFileState);
|
||||||
getRowBuff(pState->pFileState, (void*)key, sizeof(SWinKey), &pVal, &len);
|
int32_t code = getFunctionRowBuff(pState->pFileState, (void*)key, sizeof(SWinKey), &pVal, &len);
|
||||||
char* buf = ((SRowBuffPos*)pVal)->pRowBuff;
|
char* buf = ((SRowBuffPos*)pVal)->pRowBuff;
|
||||||
uint32_t rowSize = streamFileStateGeSelectRowSize(pState->pFileState);
|
uint32_t rowSize = streamFileStateGetSelectRowSize(pState->pFileState);
|
||||||
memcpy(buf + len - rowSize, value, vLen);
|
memcpy(buf + len - rowSize, value, vLen);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
#else
|
#else
|
||||||
|
@ -290,11 +290,12 @@ int32_t streamStateFuncPut(SStreamState* pState, const SWinKey* key, const void*
|
||||||
int32_t streamStateFuncGet(SStreamState* pState, const SWinKey* key, void** ppVal, int32_t* pVLen) {
|
int32_t streamStateFuncGet(SStreamState* pState, const SWinKey* key, void** ppVal, int32_t* pVLen) {
|
||||||
#ifdef USE_ROCKSDB
|
#ifdef USE_ROCKSDB
|
||||||
void* pVal = NULL;
|
void* pVal = NULL;
|
||||||
int32_t len = 0;
|
int32_t len = getRowStateRowSize(pState->pFileState);
|
||||||
int32_t code = getRowBuff(pState->pFileState, (void*)key, sizeof(SWinKey), (void**)(&pVal), &len);
|
int32_t code = getFunctionRowBuff(pState->pFileState, (void*)key, sizeof(SWinKey), (void**)(&pVal), &len);
|
||||||
char* buf = ((SRowBuffPos*)pVal)->pRowBuff;
|
char* buf = ((SRowBuffPos*)pVal)->pRowBuff;
|
||||||
uint32_t rowSize = streamFileStateGeSelectRowSize(pState->pFileState);
|
uint32_t rowSize = streamFileStateGetSelectRowSize(pState->pFileState);
|
||||||
*ppVal = buf + len - rowSize;
|
*ppVal = buf + len - rowSize;
|
||||||
|
streamStateReleaseBuf(pState, pVal, false);
|
||||||
return code;
|
return code;
|
||||||
#else
|
#else
|
||||||
return tdbTbGet(pState->pTdbState->pFuncStateDb, key, sizeof(STupleKey), ppVal, pVLen);
|
return tdbTbGet(pState->pTdbState->pFuncStateDb, key, sizeof(STupleKey), ppVal, pVLen);
|
||||||
|
@ -332,7 +333,7 @@ bool streamStateCheck(SStreamState* pState, const SWinKey* key) {
|
||||||
|
|
||||||
int32_t streamStateGetByPos(SStreamState* pState, void* pos, void** pVal) {
|
int32_t streamStateGetByPos(SStreamState* pState, void* pos, void** pVal) {
|
||||||
int32_t code = getRowBuffByPos(pState->pFileState, pos, pVal);
|
int32_t code = getRowBuffByPos(pState->pFileState, pos, pVal);
|
||||||
streamFileStateReleaseBuff(pState->pFileState, pos, false);
|
streamStateReleaseBuf(pState, pos, false);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -395,7 +396,10 @@ int32_t streamStateClear(SStreamState* pState) {
|
||||||
#endif
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
void streamStateSetNumber(SStreamState* pState, int32_t number) { pState->number = number; }
|
void streamStateSetNumber(SStreamState* pState, int32_t number, int32_t tsIdex) {
|
||||||
|
pState->number = number;
|
||||||
|
pState->tsIndex = tsIdex;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t streamStateSaveInfo(SStreamState* pState, void* pKey, int32_t keyLen, void* pVal, int32_t vLen) {
|
int32_t streamStateSaveInfo(SStreamState* pState, void* pKey, int32_t keyLen, void* pVal, int32_t vLen) {
|
||||||
#ifdef USE_ROCKSDB
|
#ifdef USE_ROCKSDB
|
||||||
|
|
|
@ -35,7 +35,7 @@ static int32_t doUpdateTaskEpset(SStreamTask* pTask, int32_t nodeId, SEpSet* pEp
|
||||||
|
|
||||||
if (pTask->info.nodeId == nodeId) { // execution task should be moved away
|
if (pTask->info.nodeId == nodeId) { // execution task should be moved away
|
||||||
epsetAssign(&pTask->info.epSet, pEpSet);
|
epsetAssign(&pTask->info.epSet, pEpSet);
|
||||||
EPSET_TO_STR(pEpSet, buf)
|
epsetToStr(pEpSet, buf, tListLen(buf));
|
||||||
stDebug("s-task:0x%x (vgId:%d) self node epset is updated %s", pTask->id.taskId, nodeId, buf);
|
stDebug("s-task:0x%x (vgId:%d) self node epset is updated %s", pTask->id.taskId, nodeId, buf);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -592,7 +592,7 @@ int32_t streamTaskSetUpstreamInfo(SStreamTask* pTask, const SStreamTask* pUpstre
|
||||||
|
|
||||||
void streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet) {
|
void streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet) {
|
||||||
char buf[512] = {0};
|
char buf[512] = {0};
|
||||||
EPSET_TO_STR(pEpSet, buf);
|
epsetToStr(pEpSet, buf, tListLen(buf));
|
||||||
|
|
||||||
int32_t numOfUpstream = taosArrayGetSize(pTask->upstreamInfo.pList);
|
int32_t numOfUpstream = taosArrayGetSize(pTask->upstreamInfo.pList);
|
||||||
for (int32_t i = 0; i < numOfUpstream; ++i) {
|
for (int32_t i = 0; i < numOfUpstream; ++i) {
|
||||||
|
@ -626,7 +626,7 @@ void streamTaskSetFixedDownstreamInfo(SStreamTask* pTask, const SStreamTask* pDo
|
||||||
|
|
||||||
void streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet) {
|
void streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet) {
|
||||||
char buf[512] = {0};
|
char buf[512] = {0};
|
||||||
EPSET_TO_STR(pEpSet, buf);
|
epsetToStr(pEpSet, buf, tListLen(buf));
|
||||||
int32_t id = pTask->id.taskId;
|
int32_t id = pTask->id.taskId;
|
||||||
|
|
||||||
int8_t type = pTask->outputInfo.type;
|
int8_t type = pTask->outputInfo.type;
|
||||||
|
@ -733,15 +733,12 @@ bool streamTaskIsAllUpstreamClosed(SStreamTask* pTask) {
|
||||||
bool streamTaskSetSchedStatusWait(SStreamTask* pTask) {
|
bool streamTaskSetSchedStatusWait(SStreamTask* pTask) {
|
||||||
bool ret = false;
|
bool ret = false;
|
||||||
|
|
||||||
// double check
|
taosThreadMutexLock(&pTask->lock);
|
||||||
if (pTask->status.schedStatus == TASK_SCHED_STATUS__INACTIVE) {
|
if (pTask->status.schedStatus == TASK_SCHED_STATUS__INACTIVE) {
|
||||||
taosThreadMutexLock(&pTask->lock);
|
pTask->status.schedStatus = TASK_SCHED_STATUS__WAITING;
|
||||||
if (pTask->status.schedStatus == TASK_SCHED_STATUS__INACTIVE) {
|
ret = true;
|
||||||
pTask->status.schedStatus = TASK_SCHED_STATUS__WAITING;
|
|
||||||
ret = true;
|
|
||||||
}
|
|
||||||
taosThreadMutexUnlock(&pTask->lock);
|
|
||||||
}
|
}
|
||||||
|
taosThreadMutexUnlock(&pTask->lock);
|
||||||
|
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
|
@ -58,6 +58,8 @@ struct SStreamFileState {
|
||||||
_state_file_remove_fn stateFileRemoveFn;
|
_state_file_remove_fn stateFileRemoveFn;
|
||||||
_state_file_get_fn stateFileGetFn;
|
_state_file_get_fn stateFileGetFn;
|
||||||
_state_file_clear_fn stateFileClearFn;
|
_state_file_clear_fn stateFileClearFn;
|
||||||
|
|
||||||
|
_state_fun_get_fn stateFunctionGetFn;
|
||||||
};
|
};
|
||||||
|
|
||||||
typedef SRowBuffPos SRowBuffInfo;
|
typedef SRowBuffPos SRowBuffInfo;
|
||||||
|
@ -157,6 +159,7 @@ SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_
|
||||||
pFileState->stateFileGetFn = intervalFileGetFn;
|
pFileState->stateFileGetFn = intervalFileGetFn;
|
||||||
pFileState->stateFileClearFn = streamStateClear_rocksdb;
|
pFileState->stateFileClearFn = streamStateClear_rocksdb;
|
||||||
pFileState->cfName = taosStrdup("state");
|
pFileState->cfName = taosStrdup("state");
|
||||||
|
pFileState->stateFunctionGetFn = getRowBuff;
|
||||||
} else {
|
} else {
|
||||||
pFileState->rowStateBuff = tSimpleHashInit(cap, hashFn);
|
pFileState->rowStateBuff = tSimpleHashInit(cap, hashFn);
|
||||||
pFileState->stateBuffCleanupFn = sessionWinStateCleanup;
|
pFileState->stateBuffCleanupFn = sessionWinStateCleanup;
|
||||||
|
@ -168,6 +171,7 @@ SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_
|
||||||
pFileState->stateFileGetFn = sessionFileGetFn;
|
pFileState->stateFileGetFn = sessionFileGetFn;
|
||||||
pFileState->stateFileClearFn = streamStateSessionClear_rocksdb;
|
pFileState->stateFileClearFn = streamStateSessionClear_rocksdb;
|
||||||
pFileState->cfName = taosStrdup("sess");
|
pFileState->cfName = taosStrdup("sess");
|
||||||
|
pFileState->stateFunctionGetFn = getSessionRowBuff;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!pFileState->usedBuffs || !pFileState->freeBuffs || !pFileState->rowStateBuff) {
|
if (!pFileState->usedBuffs || !pFileState->freeBuffs || !pFileState->rowStateBuff) {
|
||||||
|
@ -738,7 +742,7 @@ int32_t recoverSnapshot(SStreamFileState* pFileState, int64_t ckId) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamFileStateGeSelectRowSize(SStreamFileState* pFileState) { return pFileState->selectivityRowSize; }
|
int32_t streamFileStateGetSelectRowSize(SStreamFileState* pFileState) { return pFileState->selectivityRowSize; }
|
||||||
|
|
||||||
void streamFileStateReloadInfo(SStreamFileState* pFileState, TSKEY ts) {
|
void streamFileStateReloadInfo(SStreamFileState* pFileState, TSKEY ts) {
|
||||||
pFileState->flushMark = TMAX(pFileState->flushMark, ts);
|
pFileState->flushMark = TMAX(pFileState->flushMark, ts);
|
||||||
|
@ -756,3 +760,7 @@ bool isDeteled(SStreamFileState* pFileState, TSKEY ts) {
|
||||||
bool isFlushedState(SStreamFileState* pFileState, TSKEY ts, TSKEY gap) { return ts <= (pFileState->flushMark + gap); }
|
bool isFlushedState(SStreamFileState* pFileState, TSKEY ts, TSKEY gap) { return ts <= (pFileState->flushMark + gap); }
|
||||||
|
|
||||||
int32_t getRowStateRowSize(SStreamFileState* pFileState) { return pFileState->rowSize; }
|
int32_t getRowStateRowSize(SStreamFileState* pFileState) { return pFileState->rowSize; }
|
||||||
|
|
||||||
|
int32_t getFunctionRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, void** pVal, int32_t* pVLen) {
|
||||||
|
return pFileState->stateFunctionGetFn(pFileState, pKey, keyLen, pVal, pVLen);
|
||||||
|
}
|
||||||
|
|
|
@ -2188,7 +2188,7 @@ static void cliSchedMsgToDebug(SCliMsg* pMsg, char* label) {
|
||||||
STransConnCtx* pCtx = pMsg->ctx;
|
STransConnCtx* pCtx = pMsg->ctx;
|
||||||
STraceId* trace = &pMsg->msg.info.traceId;
|
STraceId* trace = &pMsg->msg.info.traceId;
|
||||||
char tbuf[512] = {0};
|
char tbuf[512] = {0};
|
||||||
EPSET_TO_STR(&pCtx->epSet, tbuf);
|
epsetToStr(&pCtx->epSet, tbuf, tListLen(tbuf));
|
||||||
tGDebug("%s retry on next node,use:%s, step: %d,timeout:%" PRId64 "", label, tbuf, pCtx->retryStep,
|
tGDebug("%s retry on next node,use:%s, step: %d,timeout:%" PRId64 "", label, tbuf, pCtx->retryStep,
|
||||||
pCtx->retryNextInterval);
|
pCtx->retryNextInterval);
|
||||||
return;
|
return;
|
||||||
|
@ -2421,7 +2421,7 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
|
||||||
if (hasEpSet) {
|
if (hasEpSet) {
|
||||||
if (rpcDebugFlag & DEBUG_TRACE) {
|
if (rpcDebugFlag & DEBUG_TRACE) {
|
||||||
char tbuf[512] = {0};
|
char tbuf[512] = {0};
|
||||||
EPSET_TO_STR(&pCtx->epSet, tbuf);
|
epsetToStr(&pCtx->epSet, tbuf, tListLen(tbuf));
|
||||||
tGTrace("%s conn %p extract epset from msg", CONN_GET_INST_LABEL(pConn), pConn);
|
tGTrace("%s conn %p extract epset from msg", CONN_GET_INST_LABEL(pConn), pConn);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -174,7 +174,9 @@ static int32_t cfgSetBool(SConfigItem *pItem, const char *value, ECfgSrcType sty
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t cfgSetInt32(SConfigItem *pItem, const char *value, ECfgSrcType stype) {
|
static int32_t cfgSetInt32(SConfigItem *pItem, const char *value, ECfgSrcType stype) {
|
||||||
int32_t ival = taosStrHumanToInt32(value);
|
int32_t ival;
|
||||||
|
int32_t code = taosStrHumanToInt32(value, &ival);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) return code;
|
||||||
if (ival < pItem->imin || ival > pItem->imax) {
|
if (ival < pItem->imin || ival > pItem->imax) {
|
||||||
uError("cfg:%s, type:%s src:%s value:%d out of range[%" PRId64 ", %" PRId64 "]", pItem->name,
|
uError("cfg:%s, type:%s src:%s value:%d out of range[%" PRId64 ", %" PRId64 "]", pItem->name,
|
||||||
cfgDtypeStr(pItem->dtype), cfgStypeStr(stype), ival, pItem->imin, pItem->imax);
|
cfgDtypeStr(pItem->dtype), cfgStypeStr(stype), ival, pItem->imin, pItem->imax);
|
||||||
|
@ -188,7 +190,9 @@ static int32_t cfgSetInt32(SConfigItem *pItem, const char *value, ECfgSrcType st
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t cfgSetInt64(SConfigItem *pItem, const char *value, ECfgSrcType stype) {
|
static int32_t cfgSetInt64(SConfigItem *pItem, const char *value, ECfgSrcType stype) {
|
||||||
int64_t ival = taosStrHumanToInt64(value);
|
int64_t ival;
|
||||||
|
int32_t code = taosStrHumanToInt64(value, &ival);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) return code;
|
||||||
if (ival < pItem->imin || ival > pItem->imax) {
|
if (ival < pItem->imin || ival > pItem->imax) {
|
||||||
uError("cfg:%s, type:%s src:%s value:%" PRId64 " out of range[%" PRId64 ", %" PRId64 "]", pItem->name,
|
uError("cfg:%s, type:%s src:%s value:%" PRId64 " out of range[%" PRId64 ", %" PRId64 "]", pItem->name,
|
||||||
cfgDtypeStr(pItem->dtype), cfgStypeStr(stype), ival, pItem->imin, pItem->imax);
|
cfgDtypeStr(pItem->dtype), cfgStypeStr(stype), ival, pItem->imin, pItem->imax);
|
||||||
|
@ -202,15 +206,16 @@ static int32_t cfgSetInt64(SConfigItem *pItem, const char *value, ECfgSrcType st
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t cfgSetFloat(SConfigItem *pItem, const char *value, ECfgSrcType stype) {
|
static int32_t cfgSetFloat(SConfigItem *pItem, const char *value, ECfgSrcType stype) {
|
||||||
float fval = (float)atof(value);
|
double dval;
|
||||||
if (fval < pItem->fmin || fval > pItem->fmax) {
|
int32_t code = parseCfgReal(value, &dval);
|
||||||
|
if (dval < pItem->fmin || dval > pItem->fmax) {
|
||||||
uError("cfg:%s, type:%s src:%s value:%f out of range[%f, %f]", pItem->name, cfgDtypeStr(pItem->dtype),
|
uError("cfg:%s, type:%s src:%s value:%f out of range[%f, %f]", pItem->name, cfgDtypeStr(pItem->dtype),
|
||||||
cfgStypeStr(stype), fval, pItem->fmin, pItem->fmax);
|
cfgStypeStr(stype), dval, pItem->fmin, pItem->fmax);
|
||||||
terrno = TSDB_CODE_OUT_OF_RANGE;
|
terrno = TSDB_CODE_OUT_OF_RANGE;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
pItem->fval = fval;
|
pItem->fval = (float)dval;
|
||||||
pItem->stype = stype;
|
pItem->stype = stype;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -408,7 +413,9 @@ int32_t cfgCheckRangeForDynUpdate(SConfig *pCfg, const char *name, const char *p
|
||||||
}
|
}
|
||||||
} break;
|
} break;
|
||||||
case CFG_DTYPE_INT32: {
|
case CFG_DTYPE_INT32: {
|
||||||
int32_t ival = (int32_t)taosStrHumanToInt32(pVal);
|
int32_t ival;
|
||||||
|
int32_t code = (int32_t)taosStrHumanToInt32(pVal, &ival);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) return code;
|
||||||
if (ival < pItem->imin || ival > pItem->imax) {
|
if (ival < pItem->imin || ival > pItem->imax) {
|
||||||
uError("cfg:%s, type:%s value:%d out of range[%" PRId64 ", %" PRId64 "]", pItem->name,
|
uError("cfg:%s, type:%s value:%d out of range[%" PRId64 ", %" PRId64 "]", pItem->name,
|
||||||
cfgDtypeStr(pItem->dtype), ival, pItem->imin, pItem->imax);
|
cfgDtypeStr(pItem->dtype), ival, pItem->imin, pItem->imax);
|
||||||
|
@ -417,7 +424,9 @@ int32_t cfgCheckRangeForDynUpdate(SConfig *pCfg, const char *name, const char *p
|
||||||
}
|
}
|
||||||
} break;
|
} break;
|
||||||
case CFG_DTYPE_INT64: {
|
case CFG_DTYPE_INT64: {
|
||||||
int64_t ival = (int64_t)taosStrHumanToInt64(pVal);
|
int64_t ival;
|
||||||
|
int32_t code = taosStrHumanToInt64(pVal, &ival);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) return code;
|
||||||
if (ival < pItem->imin || ival > pItem->imax) {
|
if (ival < pItem->imin || ival > pItem->imax) {
|
||||||
uError("cfg:%s, type:%s value:%" PRId64 " out of range[%" PRId64 ", %" PRId64 "]", pItem->name,
|
uError("cfg:%s, type:%s value:%" PRId64 " out of range[%" PRId64 ", %" PRId64 "]", pItem->name,
|
||||||
cfgDtypeStr(pItem->dtype), ival, pItem->imin, pItem->imax);
|
cfgDtypeStr(pItem->dtype), ival, pItem->imin, pItem->imax);
|
||||||
|
@ -427,9 +436,11 @@ int32_t cfgCheckRangeForDynUpdate(SConfig *pCfg, const char *name, const char *p
|
||||||
} break;
|
} break;
|
||||||
case CFG_DTYPE_FLOAT:
|
case CFG_DTYPE_FLOAT:
|
||||||
case CFG_DTYPE_DOUBLE: {
|
case CFG_DTYPE_DOUBLE: {
|
||||||
float fval = (float)atof(pVal);
|
double dval;
|
||||||
if (fval < pItem->fmin || fval > pItem->fmax) {
|
int32_t code = parseCfgReal(pVal, &dval);
|
||||||
uError("cfg:%s, type:%s value:%f out of range[%f, %f]", pItem->name, cfgDtypeStr(pItem->dtype), fval,
|
if (code != TSDB_CODE_SUCCESS) return code;
|
||||||
|
if (dval < pItem->fmin || dval > pItem->fmax) {
|
||||||
|
uError("cfg:%s, type:%s value:%f out of range[%f, %f]", pItem->name, cfgDtypeStr(pItem->dtype), dval,
|
||||||
pItem->fmin, pItem->fmax);
|
pItem->fmin, pItem->fmax);
|
||||||
terrno = TSDB_CODE_OUT_OF_RANGE;
|
terrno = TSDB_CODE_OUT_OF_RANGE;
|
||||||
return -1;
|
return -1;
|
||||||
|
|
|
@ -23,45 +23,74 @@
|
||||||
#define UNIT_ONE_PEBIBYTE (UNIT_ONE_TEBIBYTE * UNIT_SIZE_CONVERT_FACTOR)
|
#define UNIT_ONE_PEBIBYTE (UNIT_ONE_TEBIBYTE * UNIT_SIZE_CONVERT_FACTOR)
|
||||||
#define UNIT_ONE_EXBIBYTE (UNIT_ONE_PEBIBYTE * UNIT_SIZE_CONVERT_FACTOR)
|
#define UNIT_ONE_EXBIBYTE (UNIT_ONE_PEBIBYTE * UNIT_SIZE_CONVERT_FACTOR)
|
||||||
|
|
||||||
int64_t taosStrHumanToInt64(const char* str) {
|
static int32_t parseCfgIntWithUnit(const char* str, double *res) {
|
||||||
size_t sLen = strlen(str);
|
double val, temp = INT64_MAX;
|
||||||
if (sLen < 2) return atoll(str);
|
char* endPtr;
|
||||||
|
errno = 0;
|
||||||
int64_t val = 0;
|
val = taosStr2Int64(str, &endPtr, 0);
|
||||||
|
if (*endPtr == '.' || errno == ERANGE) {
|
||||||
char* strNoUnit = NULL;
|
errno = 0;
|
||||||
char unit = str[sLen - 1];
|
val = taosStr2Double(str, &endPtr);
|
||||||
if ((unit == 'P') || (unit == 'p')) {
|
|
||||||
strNoUnit = taosMemoryCalloc(sLen, 1);
|
|
||||||
memcpy(strNoUnit, str, sLen - 1);
|
|
||||||
|
|
||||||
val = atof(strNoUnit) * UNIT_ONE_PEBIBYTE;
|
|
||||||
} else if ((unit == 'T') || (unit == 't')) {
|
|
||||||
strNoUnit = taosMemoryCalloc(sLen, 1);
|
|
||||||
memcpy(strNoUnit, str, sLen - 1);
|
|
||||||
|
|
||||||
val = atof(strNoUnit) * UNIT_ONE_TEBIBYTE;
|
|
||||||
} else if ((unit == 'G') || (unit == 'g')) {
|
|
||||||
strNoUnit = taosMemoryCalloc(sLen, 1);
|
|
||||||
memcpy(strNoUnit, str, sLen - 1);
|
|
||||||
|
|
||||||
val = atof(strNoUnit) * UNIT_ONE_GIBIBYTE;
|
|
||||||
} else if ((unit == 'M') || (unit == 'm')) {
|
|
||||||
strNoUnit = taosMemoryCalloc(sLen, 1);
|
|
||||||
memcpy(strNoUnit, str, sLen - 1);
|
|
||||||
|
|
||||||
val = atof(strNoUnit) * UNIT_ONE_MEBIBYTE;
|
|
||||||
} else if ((unit == 'K') || (unit == 'k')) {
|
|
||||||
strNoUnit = taosMemoryCalloc(sLen, 1);
|
|
||||||
memcpy(strNoUnit, str, sLen - 1);
|
|
||||||
|
|
||||||
val = atof(strNoUnit) * UNIT_ONE_KIBIBYTE;
|
|
||||||
} else {
|
|
||||||
val = atoll(str);
|
|
||||||
}
|
}
|
||||||
|
if (endPtr == str || errno == ERANGE || isnan(val)) {
|
||||||
|
terrno = TSDB_CODE_INVALID_CFG_VALUE;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
while (isspace((unsigned char)*endPtr)) endPtr++;
|
||||||
|
uint64_t factor = 1;
|
||||||
|
if (*endPtr != '\0') {
|
||||||
|
switch (*endPtr) {
|
||||||
|
case 'P':
|
||||||
|
case 'p': {
|
||||||
|
temp /= UNIT_ONE_PEBIBYTE;
|
||||||
|
factor = UNIT_ONE_PEBIBYTE;
|
||||||
|
} break;
|
||||||
|
case 'T':
|
||||||
|
case 't': {
|
||||||
|
temp /= UNIT_ONE_TEBIBYTE;
|
||||||
|
factor = UNIT_ONE_TEBIBYTE;
|
||||||
|
} break;
|
||||||
|
case 'G':
|
||||||
|
case 'g': {
|
||||||
|
temp /= UNIT_ONE_GIBIBYTE;
|
||||||
|
factor = UNIT_ONE_GIBIBYTE;
|
||||||
|
} break;
|
||||||
|
case 'M':
|
||||||
|
case 'm': {
|
||||||
|
temp /= UNIT_ONE_MEBIBYTE;
|
||||||
|
factor = UNIT_ONE_MEBIBYTE;
|
||||||
|
} break;
|
||||||
|
case 'K':
|
||||||
|
case 'k': {
|
||||||
|
temp /= UNIT_ONE_KIBIBYTE;
|
||||||
|
factor = UNIT_ONE_KIBIBYTE;
|
||||||
|
} break;
|
||||||
|
default:
|
||||||
|
terrno = TSDB_CODE_INVALID_CFG_VALUE;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
if ((val > 0 && val > temp) || (val < 0 && val < -temp)) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_RANGE;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
endPtr++;
|
||||||
|
val *= factor;
|
||||||
|
}
|
||||||
|
while (isspace((unsigned char)*endPtr)) endPtr++;
|
||||||
|
if (*endPtr) {
|
||||||
|
terrno = TSDB_CODE_INVALID_CFG_VALUE;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
val = rint(val);
|
||||||
|
*res = val;
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
taosMemoryFree(strNoUnit);
|
int32_t taosStrHumanToInt64(const char* str, int64_t *out) {
|
||||||
return val;
|
double res;
|
||||||
|
int32_t code = parseCfgIntWithUnit(str, &res);
|
||||||
|
if (code == TSDB_CODE_SUCCESS) *out = (int64_t)res;
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
#ifdef BUILD_NO_CALL
|
#ifdef BUILD_NO_CALL
|
||||||
|
@ -83,35 +112,17 @@ void taosInt64ToHumanStr(int64_t val, char* outStr) {
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
int32_t taosStrHumanToInt32(const char* str) {
|
int32_t taosStrHumanToInt32(const char* str, int32_t* out) {
|
||||||
size_t sLen = strlen(str);
|
double res;
|
||||||
if (sLen < 2) return atoll(str);
|
int32_t code = parseCfgIntWithUnit(str, &res);
|
||||||
|
if (code == TSDB_CODE_SUCCESS) {
|
||||||
int32_t val = 0;
|
if (res < INT32_MIN || res > INT32_MAX) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_RANGE;
|
||||||
char* strNoUnit = NULL;
|
return -1;
|
||||||
char unit = str[sLen - 1];
|
}
|
||||||
if ((unit == 'G') || (unit == 'g')) {
|
*out = (int32_t)res;
|
||||||
strNoUnit = taosMemoryCalloc(sLen, 1);
|
|
||||||
memcpy(strNoUnit, str, sLen - 1);
|
|
||||||
|
|
||||||
val = atof(strNoUnit) * UNIT_ONE_GIBIBYTE;
|
|
||||||
} else if ((unit == 'M') || (unit == 'm')) {
|
|
||||||
strNoUnit = taosMemoryCalloc(sLen, 1);
|
|
||||||
memcpy(strNoUnit, str, sLen - 1);
|
|
||||||
|
|
||||||
val = atof(strNoUnit) * UNIT_ONE_MEBIBYTE;
|
|
||||||
} else if ((unit == 'K') || (unit == 'k')) {
|
|
||||||
strNoUnit = taosMemoryCalloc(sLen, 1);
|
|
||||||
memcpy(strNoUnit, str, sLen - 1);
|
|
||||||
|
|
||||||
val = atof(strNoUnit) * UNIT_ONE_KIBIBYTE;
|
|
||||||
} else {
|
|
||||||
val = atoll(str);
|
|
||||||
}
|
}
|
||||||
|
return code;
|
||||||
taosMemoryFree(strNoUnit);
|
|
||||||
return val;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#ifdef BUILD_NO_CALL
|
#ifdef BUILD_NO_CALL
|
||||||
|
|
|
@ -496,3 +496,21 @@ size_t twcsncspn(const TdUcs4 *wcs, size_t size, const TdUcs4 *reject, size_t rs
|
||||||
|
|
||||||
return index;
|
return index;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t parseCfgReal(const char* str, double* out) {
|
||||||
|
double val;
|
||||||
|
char *endPtr;
|
||||||
|
errno = 0;
|
||||||
|
val = taosStr2Double(str, &endPtr);
|
||||||
|
if (str == endPtr || errno == ERANGE || isnan(val)) {
|
||||||
|
terrno = TSDB_CODE_INVALID_CFG_VALUE;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
while(isspace((unsigned char)*endPtr)) endPtr++;
|
||||||
|
if (*endPtr != '\0') {
|
||||||
|
terrno = TSDB_CODE_INVALID_CFG_VALUE;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
*out = val;
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
|
@ -372,17 +372,22 @@ print step4=============
|
||||||
|
|
||||||
sql create database test6 vgroups 4;
|
sql create database test6 vgroups 4;
|
||||||
sql use test6;
|
sql use test6;
|
||||||
sql create stable st(ts timestamp,a int,b int,c int,d double) tags(ta int,tb int,tc int);
|
sql create stable st(ts timestamp,a int,b int,c int,d int) tags(ta int,tb int,tc int);
|
||||||
sql create table t1 using st tags(1,1,1);
|
sql create table t1 using st tags(1,1,1);
|
||||||
sql create table t2 using st tags(2,2,2);
|
sql create table t2 using st tags(2,2,2);
|
||||||
sql create stream streams6 trigger at_once ignore expired 0 ignore update 0 into streamt6 as select _wstart, b, c,min(c), ta, tb from st interval(1s);
|
sql create stream streams6 trigger at_once ignore expired 0 ignore update 0 into streamt6 as select _wstart, b, c,min(c), ta, tb from st interval(1s);
|
||||||
|
sql create stream streams7 trigger at_once ignore expired 0 ignore update 0 into streamt7 as select ts, max(c) from st interval(1s);
|
||||||
|
sql create stream streams8 trigger at_once ignore expired 0 ignore update 0 into streamt8 as select ts, b, c, last(c), ta, tb from st session(ts, 1s);
|
||||||
|
sql create stream streams9 trigger at_once ignore expired 0 ignore update 0 into streamt9 as select ts, b, c, last_row(c), ta, tb from st partition by tbname state_window(a);
|
||||||
|
sql create stream streams10 trigger at_once ignore expired 0 ignore update 0 into streamt10 as select ts, b, c, last(c), ta, tb from st partition by tbname event_window start with d = 0 end with d = 9;
|
||||||
|
sql create stream streams11 trigger at_once ignore expired 1 ignore update 0 watermark 100s into streamt11 as select ts, b, c, last(c), ta, tb from st partition by tbname count_window(2);
|
||||||
|
|
||||||
sleep 1000
|
sleep 1000
|
||||||
|
|
||||||
sql insert into t1 values(1648791211000,1,2,3,1.0);
|
sql insert into t1 values(1648791211000,1,2,3,0);
|
||||||
sql insert into t1 values(1648791213000,2,3,4,1.1);
|
sql insert into t1 values(1648791213000,2,3,4,0);
|
||||||
sql insert into t2 values(1648791215000,3,4,5,1.1);
|
sql insert into t2 values(1648791215000,3,4,5,0);
|
||||||
sql insert into t2 values(1648791217000,4,5,6,1.1);
|
sql insert into t2 values(1648791217000,4,5,6,0);
|
||||||
|
|
||||||
$loop_count = 0
|
$loop_count = 0
|
||||||
|
|
||||||
|
|
|
@ -240,6 +240,49 @@ class TDTestCase:
|
||||||
self.show_create_sysdb_sql()
|
self.show_create_sysdb_sql()
|
||||||
self.show_create_systb_sql()
|
self.show_create_systb_sql()
|
||||||
self.show_column_name()
|
self.show_column_name()
|
||||||
|
self.test_show_variables()
|
||||||
|
|
||||||
|
def get_variable(self, name: str, local: bool = True):
|
||||||
|
if local:
|
||||||
|
sql = 'show local variables'
|
||||||
|
else:
|
||||||
|
sql = f'select `value` from information_schema.ins_dnode_variables where name like "{name}"'
|
||||||
|
tdSql.query(sql, queryTimes=1)
|
||||||
|
res = tdSql.queryResult
|
||||||
|
if local:
|
||||||
|
for row in res:
|
||||||
|
if row[0] == name:
|
||||||
|
return row[1]
|
||||||
|
else:
|
||||||
|
if len(res) > 0:
|
||||||
|
return res[0][0]
|
||||||
|
raise Exception(f"variable {name} not found")
|
||||||
|
|
||||||
|
def test_show_variables(self):
|
||||||
|
epsion = 0.0000001
|
||||||
|
var = 'minimalTmpDirGB'
|
||||||
|
expect_val: float = 10.11
|
||||||
|
sql = f'ALTER LOCAL "{var}" "{expect_val}"'
|
||||||
|
tdSql.execute(sql)
|
||||||
|
val: float = float(self.get_variable(var))
|
||||||
|
if val != expect_val:
|
||||||
|
tdLog.exit(f'failed to set local {var} to {expect_val} actually {val}')
|
||||||
|
|
||||||
|
error_vals = ['a', '10a', '', '1.100r', '1.12 r']
|
||||||
|
for error_val in error_vals:
|
||||||
|
tdSql.error(f'ALTER LOCAL "{var}" "{error_val}"')
|
||||||
|
|
||||||
|
var = 'supportVnodes'
|
||||||
|
expect_val = 1240 ## 1.211111 * 1024
|
||||||
|
sql = f'ALTER DNODE 1 "{var}" "1.211111k"'
|
||||||
|
tdSql.execute(sql, queryTimes=1)
|
||||||
|
val = int(self.get_variable(var, False))
|
||||||
|
if val != expect_val:
|
||||||
|
tdLog.exit(f'failed to set dnode {var} to {expect_val} actually {val}')
|
||||||
|
|
||||||
|
error_vals = ['a', '10a', '', '1.100r', '1.12 r', '5k']
|
||||||
|
for error_val in error_vals:
|
||||||
|
tdSql.error(f'ALTER DNODE 1 "{var}" "{error_val}"')
|
||||||
|
|
||||||
def stop(self):
|
def stop(self):
|
||||||
tdSql.close()
|
tdSql.close()
|
||||||
|
|
Loading…
Reference in New Issue