Merge branch 'feat/TS-4243-3.0' of https://github.com/taosdata/TDengine into feat/TS-4243-tmq

This commit is contained in:
wangmm0220 2024-04-11 16:21:14 +08:00
commit f609a45dd2
49 changed files with 562 additions and 369 deletions

View File

@ -29,21 +29,7 @@ typedef struct SCorEpSet {
#define GET_ACTIVE_EP(_eps) (&((_eps)->eps[(_eps)->inUse]))
#define EPSET_TO_STR(_eps, tbuf) \
do { \
int len = snprintf((tbuf), sizeof(tbuf), "epset:{"); \
for (int _i = 0; _i < (_eps)->numOfEps; _i++) { \
if (_i == (_eps)->numOfEps - 1) { \
len += \
snprintf((tbuf) + len, sizeof(tbuf) - len, "%d. %s:%d", _i, (_eps)->eps[_i].fqdn, (_eps)->eps[_i].port); \
} else { \
len += \
snprintf((tbuf) + len, sizeof(tbuf) - len, "%d. %s:%d, ", _i, (_eps)->eps[_i].fqdn, (_eps)->eps[_i].port); \
} \
} \
len += snprintf((tbuf) + len, sizeof(tbuf) - len, "}, inUse:%d", (_eps)->inUse); \
} while (0);
int32_t epsetToStr(const SEpSet* pEpSet, char* pBuf, int32_t len);
int32_t taosGetFqdnPortFromEp(const char* ep, SEp* pEp);
void addEpIntoEpSet(SEpSet* pEpSet, const char* fqdn, uint16_t port);

View File

@ -333,7 +333,7 @@ typedef struct SStateStore {
int32_t (*streamStateGetByPos)(SStreamState* pState, void* pos, void** pVal);
int32_t (*streamStateDel)(SStreamState* pState, const SWinKey* key);
int32_t (*streamStateClear)(SStreamState* pState);
void (*streamStateSetNumber)(SStreamState* pState, int32_t number);
void (*streamStateSetNumber)(SStreamState* pState, int32_t number, int32_t tsIdex);
int32_t (*streamStateSaveInfo)(SStreamState* pState, void* pKey, int32_t keyLen, void* pVal, int32_t vLen);
int32_t (*streamStateGetInfo)(SStreamState* pState, void* pKey, int32_t keyLen, void** pVal, int32_t* pLen);

View File

@ -29,6 +29,7 @@ struct SResultRowEntryInfo;
struct SFunctionNode;
typedef struct SScalarParam SScalarParam;
typedef struct SStreamState SStreamState;
typedef struct SFuncExecEnv {
int32_t calcMemSize;
@ -127,7 +128,7 @@ typedef struct SInputColumnInfoData {
typedef struct SSerializeDataHandle {
struct SDiskbasedBuf *pBuf;
int32_t currentPage;
void *pState;
SStreamState *pState;
} SSerializeDataHandle;
// incremental state storage
@ -165,7 +166,7 @@ typedef struct STdbState {
void *txn;
} STdbState;
typedef struct {
struct SStreamState {
STdbState *pTdbState;
struct SStreamFileState *pFileState;
int32_t number;
@ -174,7 +175,8 @@ typedef struct {
int64_t streamId;
int64_t streamBackendRid;
int8_t dump;
} SStreamState;
int32_t tsIndex;
};
typedef struct SFunctionStateStore {
int32_t (*streamStateFuncPut)(SStreamState *pState, const SWinKey *key, const void *value, int32_t vLen);

View File

@ -46,7 +46,7 @@ bool streamStateCheck(SStreamState* pState, const SWinKey* key);
int32_t streamStateGetByPos(SStreamState* pState, void* pos, void** pVal);
int32_t streamStateDel(SStreamState* pState, const SWinKey* key);
int32_t streamStateClear(SStreamState* pState);
void streamStateSetNumber(SStreamState* pState, int32_t number);
void streamStateSetNumber(SStreamState* pState, int32_t number, int32_t tsIdex);
int32_t streamStateSaveInfo(SStreamState* pState, void* pKey, int32_t keyLen, void* pVal, int32_t vLen);
int32_t streamStateGetInfo(SStreamState* pState, void* pKey, int32_t keyLen, void** pVal, int32_t* pLen);

View File

@ -516,7 +516,6 @@ typedef struct SStreamMeta {
TdThreadMutex backendMutex;
SMetaHbInfo* pHbInfo;
STaskUpdateInfo updateInfo;
SHashObj* pUpdateTaskSet;
int32_t numOfStreamTasks; // this value should be increased when a new task is added into the meta
int32_t numOfPausedTasks;
int64_t rid;

View File

@ -31,7 +31,6 @@ typedef struct SStreamFileState SStreamFileState;
typedef SList SStreamSnapshot;
typedef void* (*_state_buff_get_fn)(void* pRowBuff, const void* pKey, size_t keyLen);
typedef int32_t (*_state_buff_put_fn)(void* pRowBuff, const void* pKey, size_t keyLen, const void* data, size_t dataLen);
typedef int32_t (*_state_buff_remove_fn)(void* pRowBuff, const void* pKey, size_t keyLen);
typedef int32_t (*_state_buff_remove_by_pos_fn)(SStreamFileState* pState, SRowBuffPos* pPos);
typedef void (*_state_buff_cleanup_fn)(void* pRowBuff);
@ -41,6 +40,8 @@ typedef int32_t (*_state_file_remove_fn)(SStreamFileState* pFileState, const voi
typedef int32_t (*_state_file_get_fn)(SStreamFileState* pFileState, void* pKey, void* data, int32_t* pDataLen);
typedef int32_t (*_state_file_clear_fn)(SStreamState* pState);
typedef int32_t (*_state_fun_get_fn) (SStreamFileState* pFileState, void* pKey, int32_t keyLen, void** pVal, int32_t* pVLen);
typedef int32_t (*range_cmpr_fn)(const SSessionKey* pWin1, const SSessionKey* pWin2);
SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_t rowSize, uint32_t selectRowSize,
@ -64,7 +65,7 @@ int32_t recoverSnapshot(SStreamFileState* pFileState, int64_t ckId);
int32_t getSnapshotIdList(SStreamFileState* pFileState, SArray* list);
int32_t deleteExpiredCheckPoint(SStreamFileState* pFileState, TSKEY mark);
int32_t streamFileStateGeSelectRowSize(SStreamFileState* pFileState);
int32_t streamFileStateGetSelectRowSize(SStreamFileState* pFileState);
void streamFileStateReloadInfo(SStreamFileState* pFileState, TSKEY ts);
void* getRowStateBuff(SStreamFileState* pFileState);
@ -105,6 +106,10 @@ int32_t getStateWinResultBuff(SStreamFileState* pFileState, SSessionKey* key, ch
int32_t getCountWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, COUNT_TYPE winCount, void** pVal, int32_t* pVLen);
int32_t createCountWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, void** pVal, int32_t* pVLen);
//function
int32_t getSessionRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, void** pVal, int32_t* pVLen);
int32_t getFunctionRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, void** pVal, int32_t* pVLen);
#ifdef __cplusplus
}
#endif

View File

@ -767,6 +767,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_PAR_COL_PK_TYPE TAOS_DEF_ERROR_CODE(0, 0x2673)
#define TSDB_CODE_PAR_INVALID_PK_OP TAOS_DEF_ERROR_CODE(0, 0x2674)
#define TSDB_CODE_PAR_PRIMARY_KEY_IS_NULL TAOS_DEF_ERROR_CODE(0, 0x2675)
#define TSDB_CODE_PAR_PRIMARY_KEY_IS_NONE TAOS_DEF_ERROR_CODE(0, 0x2676)
#define TSDB_CODE_PAR_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x26FF)
//planner

View File

@ -22,10 +22,10 @@
extern "C" {
#endif
int64_t taosStrHumanToInt64(const char* str);
int32_t taosStrHumanToInt64(const char* str, int64_t* out);
void taosInt64ToHumanStr(int64_t val, char* outStr);
int32_t taosStrHumanToInt32(const char* str);
int32_t taosStrHumanToInt32(const char* str, int32_t* out);
void taosInt32ToHumanStr(int32_t val, char* outStr);
#ifdef __cplusplus

View File

@ -56,6 +56,8 @@ void taosIpPort2String(uint32_t ip, uint16_t port, char *str);
void *tmemmem(const char *haystack, int hlen, const char *needle, int nlen);
int32_t parseCfgReal(const char* str, double* out);
static FORCE_INLINE void taosEncryptPass(uint8_t *inBuf, size_t inLen, char *target) {
T_MD5_CTX context;
tMD5Init(&context);

View File

@ -1331,6 +1331,11 @@ void* blockDataDestroy(SSDataBlock* pBlock) {
return NULL;
}
if (IS_VAR_DATA_TYPE(pBlock->info.pks[0].type)) {
taosMemoryFreeClear(pBlock->info.pks[0].pData);
taosMemoryFreeClear(pBlock->info.pks[1].pData);
}
blockDataFreeRes(pBlock);
taosMemoryFreeClear(pBlock);
return NULL;

View File

@ -101,16 +101,18 @@ typedef struct {
int32_t kvRowSize;
} SRowBuildScanInfo;
static FORCE_INLINE void tRowBuildScanAddNone(SRowBuildScanInfo *sinfo, const STColumn *pTColumn) {
ASSERT((pTColumn->flags & COL_IS_KEY) == 0);
static FORCE_INLINE int32_t tRowBuildScanAddNone(SRowBuildScanInfo *sinfo, const STColumn *pTColumn) {
if ((pTColumn->flags & COL_IS_KEY)) return TSDB_CODE_PAR_PRIMARY_KEY_IS_NONE;
sinfo->numOfNone++;
return 0;
}
static FORCE_INLINE void tRowBuildScanAddNull(SRowBuildScanInfo *sinfo, const STColumn *pTColumn) {
ASSERT((pTColumn->flags & COL_IS_KEY) == 0);
static FORCE_INLINE int32_t tRowBuildScanAddNull(SRowBuildScanInfo *sinfo, const STColumn *pTColumn) {
if ((pTColumn->flags & COL_IS_KEY)) return TSDB_CODE_PAR_PRIMARY_KEY_IS_NULL;
sinfo->numOfNull++;
sinfo->kvMaxOffset = sinfo->kvPayloadSize;
sinfo->kvPayloadSize += tPutI16v(NULL, -pTColumn->colId);
return 0;
}
static FORCE_INLINE void tRowBuildScanAddValue(SRowBuildScanInfo *sinfo, SColVal *colVal, const STColumn *pTColumn) {
@ -142,6 +144,7 @@ static FORCE_INLINE void tRowBuildScanAddValue(SRowBuildScanInfo *sinfo, SColVal
}
static int32_t tRowBuildScan(SArray *colVals, const STSchema *schema, SRowBuildScanInfo *sinfo) {
int32_t code = 0;
int32_t colValIndex = 1;
int32_t numOfColVals = TARRAY_SIZE(colVals);
SColVal *colValArray = (SColVal *)TARRAY_DATA(colVals);
@ -158,7 +161,7 @@ static int32_t tRowBuildScan(SArray *colVals, const STSchema *schema, SRowBuildS
for (int32_t i = 1; i < schema->numOfCols; i++) {
for (;;) {
if (colValIndex >= numOfColVals) {
tRowBuildScanAddNone(sinfo, schema->columns + i);
if ((code = tRowBuildScanAddNone(sinfo, schema->columns + i))) goto _exit;
break;
}
@ -168,15 +171,15 @@ static int32_t tRowBuildScan(SArray *colVals, const STSchema *schema, SRowBuildS
if (COL_VAL_IS_VALUE(&colValArray[colValIndex])) {
tRowBuildScanAddValue(sinfo, &colValArray[colValIndex], schema->columns + i);
} else if (COL_VAL_IS_NULL(&colValArray[colValIndex])) {
tRowBuildScanAddNull(sinfo, schema->columns + i);
if ((code = tRowBuildScanAddNull(sinfo, schema->columns + i))) goto _exit;
} else if (COL_VAL_IS_NONE(&colValArray[colValIndex])) {
tRowBuildScanAddNone(sinfo, schema->columns + i);
if ((code = tRowBuildScanAddNone(sinfo, schema->columns + i))) goto _exit;
}
colValIndex++;
break;
} else if (colValArray[colValIndex].cid > schema->columns[i].colId) {
tRowBuildScanAddNone(sinfo, schema->columns + i);
if ((code = tRowBuildScanAddNone(sinfo, schema->columns + i))) goto _exit;
break;
} else { // skip useless value
colValIndex++;
@ -250,7 +253,8 @@ static int32_t tRowBuildScan(SArray *colVals, const STSchema *schema, SRowBuildS
+ sinfo->kvIndexSize // index array
+ sinfo->kvPayloadSize; // payload
return 0;
_exit:
return code;
}
static int32_t tRowBuildTupleRow(SArray *aColVal, const SRowBuildScanInfo *sinfo, const STSchema *schema,

View File

@ -70,6 +70,7 @@ void epsetAssign(SEpSet* pDst, const SEpSet* pSrc) {
tstrncpy(pDst->eps[i].fqdn, pSrc->eps[i].fqdn, tListLen(pSrc->eps[i].fqdn));
}
}
void epAssign(SEp* pDst, SEp* pSrc) {
if (pSrc == NULL || pDst == NULL) {
return;
@ -78,6 +79,7 @@ void epAssign(SEp* pDst, SEp* pSrc) {
tstrncpy(pDst->fqdn, pSrc->fqdn, tListLen(pSrc->fqdn));
pDst->port = pSrc->port;
}
void epsetSort(SEpSet* pDst) {
if (pDst->numOfEps <= 1) {
return;
@ -127,6 +129,34 @@ SEpSet getEpSet_s(SCorEpSet* pEpSet) {
return ep;
}
int32_t epsetToStr(const SEpSet* pEpSet, char* pBuf, int32_t bufLen) {
int len = snprintf(pBuf, bufLen, "epset:{");
if (len < 0) {
return -1;
}
for (int _i = 0; (_i < pEpSet->numOfEps) && (bufLen > len); _i++) {
int32_t ret = 0;
if (_i == pEpSet->numOfEps - 1) {
ret = snprintf(pBuf + len, bufLen - len, "%d. %s:%d", _i, pEpSet->eps[_i].fqdn, pEpSet->eps[_i].port);
} else {
ret = snprintf(pBuf + len, bufLen - len, "%d. %s:%d, ", _i, pEpSet->eps[_i].fqdn, pEpSet->eps[_i].port);
}
if (ret < 0) {
return -1;
}
len += ret;
}
if (len < bufLen) {
/*len += */snprintf(pBuf + len, bufLen - len, "}, inUse:%d", pEpSet->inUse);
}
return TSDB_CODE_SUCCESS;
}
int32_t taosGenCrashJsonMsg(int signum, char** pMsg, int64_t clusterId, int64_t startTime) {
SJson* pJson = tjsonCreateObject();
if (pJson == NULL) return -1;

View File

@ -1788,7 +1788,8 @@ static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pP
const SEp *pPrevEp = GET_ACTIVE_EP(&pPrevEntry->epset);
char buf[256] = {0};
EPSET_TO_STR(&pCurrent->epset, buf);
epsetToStr(&pCurrent->epset, buf, tListLen(buf));
mDebug("nodeId:%d restart/epset changed detected, old:%s:%d -> new:%s, stageUpdate:%d", pCurrent->nodeId,
pPrevEp->fqdn, pPrevEp->port, buf, pPrevEntry->stageUpdated);
@ -1939,7 +1940,7 @@ static SArray *extractNodeListFromStream(SMnode *pMnode) {
taosArrayPush(plist, pEntry);
char buf[256] = {0};
EPSET_TO_STR(&pEntry->epset, buf);
epsetToStr(&pEntry->epset, buf, tListLen(buf));
mDebug("extract nodeInfo from stream obj, nodeId:%d, %s", pEntry->nodeId, buf);
}
taosHashCleanup(pHash);

View File

@ -114,7 +114,7 @@ SArray *mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady) {
}
char buf[256] = {0};
EPSET_TO_STR(&entry.epset, buf);
epsetToStr(&entry.epset, buf, tListLen(buf));
mDebug("take node snapshot, nodeId:%d %s", entry.nodeId, buf);
taosArrayPush(pVgroupListSnapshot, &entry);
@ -133,7 +133,7 @@ SArray *mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady) {
entry.nodeId = SNODE_HANDLE;
char buf[256] = {0};
EPSET_TO_STR(&entry.epset, buf);
epsetToStr(&entry.epset, buf, tListLen(buf));
mDebug("take snode snapshot, nodeId:%d %s", entry.nodeId, buf);
taosArrayPush(pVgroupListSnapshot, &entry);
sdbRelease(pSdb, pObj);
@ -302,7 +302,7 @@ static int32_t doSetPauseAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTa
}
char buf[256] = {0};
EPSET_TO_STR(&epset, buf);
epsetToStr(&epset, buf, tListLen(buf));
mDebug("pause stream task in node:%d, epset:%s", pTask->info.nodeId, buf);
code = setTransAction(pTrans, pReq, sizeof(SVPauseStreamTaskReq), TDMT_STREAM_TASK_PAUSE, &epset, 0);

View File

@ -30,11 +30,11 @@
extern "C" {
#endif
typedef struct SSnode {
struct SSnode {
char* path;
SStreamMeta* pMeta;
SMsgCb msgCb;
} SSnode;
};
#if 0
typedef struct {

View File

@ -622,8 +622,6 @@ int32_t doConvertRows(SSubmitTbData* pTableData, const STSchema* pTSchema, SSDat
}
SRow* pRow = NULL;
tqInfo("result column flag:%d", pTSchema->columns[1].flags);
code = tRowBuild(pVals, (STSchema*)pTSchema, &pRow);
if (code != TSDB_CODE_SUCCESS) {
tDestroySubmitTbData(pTableData, TSDB_MSG_FLG_ENCODE);

View File

@ -506,6 +506,10 @@ int32_t tqGetStreamExecInfo(SVnode* pVnode, int64_t streamId, int64_t* pDelay, b
}
// extract the required source task for a given stream, identified by streamId
streamMetaRLock(pMeta);
numOfTasks = taosArrayGetSize(pMeta->pTaskList);
for (int32_t i = 0; i < numOfTasks; ++i) {
STaskId* pId = taosArrayGet(pMeta->pTaskList, i);
if (pId->streamId != streamId) {
@ -557,5 +561,7 @@ int32_t tqGetStreamExecInfo(SVnode* pVnode, int64_t streamId, int64_t* pDelay, b
walCloseReader(pReader);
}
streamMetaRUnLock(pMeta);
return TSDB_CODE_SUCCESS;
}

View File

@ -807,6 +807,7 @@ int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLead
int32_t tqStartTaskCompleteCallback(SStreamMeta* pMeta) {
STaskStartInfo* pStartInfo = &pMeta->startInfo;
int32_t vgId = pMeta->vgId;
bool scanWal = false;
streamMetaWLock(pMeta);
if (pStartInfo->taskStarting == 1) {
@ -831,10 +832,18 @@ int32_t tqStartTaskCompleteCallback(SStreamMeta* pMeta) {
pStartInfo->restartCount = 0;
tqDebug("vgId:%d all tasks are ready, reset restartCounter 0, not restart tasks", vgId);
}
scanWal = true;
}
}
streamMetaWUnLock(pMeta);
if (scanWal && (vgId != SNODE_HANDLE)) {
tqDebug("vgId:%d start scan wal for executing tasks", vgId);
tqScanWalAsync(pMeta->ahandle, true);
}
return TSDB_CODE_SUCCESS;
}

View File

@ -57,7 +57,6 @@ static void saveOneRowForLastRaw(SLastCol* pColVal, SCacheRowsReader* pReader, c
static int32_t saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* pReader, const int32_t* slotIds,
const int32_t* dstSlotIds, void** pRes, const char* idStr) {
int32_t numOfRows = pBlock->info.rows;
// bool allNullRow = true;
if (HASTYPE(pReader->type, CACHESCAN_RETRIEVE_LAST)) {
uint64_t ts = TSKEY_MIN;
@ -108,11 +107,12 @@ static int32_t saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* p
}
}
// pColInfoData->info.bytes includes the VARSTR_HEADER_SIZE, need to substruct it
// pColInfoData->info.bytes includes the VARSTR_HEADER_SIZE, need to subtract it
p->hasResult = true;
varDataSetLen(pRes[i], pColInfoData->info.bytes - VARSTR_HEADER_SIZE);
colDataSetVal(pColInfoData, numOfRows, (const char*)pRes[i], false);
}
for (int32_t idx = 0; idx < taosArrayGetSize(pBlock->pDataBlock); ++idx) {
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, idx);
if (idx < funcTypeBlockArray->size) {
@ -233,6 +233,8 @@ int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, void* pTableIdList,
if (IS_VAR_DATA_TYPE(pPkCol->type)) {
p->rowKey.pks[0].pData = taosMemoryCalloc(1, pPkCol->bytes);
}
p->pkColumn = *pPkCol;
}
if (numOfTables == 0) {
@ -366,15 +368,15 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
goto _end;
}
for (int32_t j = 0; j < pr->numOfCols; ++j) {
int32_t bytes;
if (slotIds[j] == -1) {
bytes = 1;
} else {
bytes = pr->pSchema->columns[slotIds[j]].bytes;
}
int32_t pkBufLen = 0;
if (pr->rowKey.numOfPKs > 0) {
pkBufLen = pr->pkColumn.bytes;
}
pRes[j] = taosMemoryCalloc(1, sizeof(SFirstLastRes) + bytes + VARSTR_HEADER_SIZE);
for (int32_t j = 0; j < pr->numOfCols; ++j) {
int32_t bytes = (slotIds[j] == -1) ? 1 : pr->pSchema->columns[slotIds[j]].bytes;
pRes[j] = taosMemoryCalloc(1, sizeof(SFirstLastRes) + bytes + pkBufLen + VARSTR_HEADER_SIZE);
SFirstLastRes* p = (SFirstLastRes*)varDataVal(pRes[j]);
p->ts = INT64_MIN;
}

View File

@ -53,6 +53,13 @@ SSttBlockLoadInfo *tCreateSttBlockLoadInfo(STSchema *pSchema, int16_t *colList,
return pLoadInfo;
}
static void freeItem(void* pValue) {
SValue* p = (SValue*) pValue;
if (IS_VAR_DATA_TYPE(p->type)) {
taosMemoryFree(p->pData);
}
}
void *destroySttBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo) {
if (pLoadInfo == NULL) {
return NULL;
@ -72,8 +79,8 @@ void *destroySttBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo) {
if (pLoadInfo->info.pCount != NULL) {
taosArrayDestroy(pLoadInfo->info.pUid);
taosArrayDestroy(pLoadInfo->info.pFirstKey);
taosArrayDestroy(pLoadInfo->info.pLastKey);
taosArrayDestroyEx(pLoadInfo->info.pFirstKey, freeItem);
taosArrayDestroyEx(pLoadInfo->info.pLastKey, freeItem);
taosArrayDestroy(pLoadInfo->info.pCount);
taosArrayDestroy(pLoadInfo->info.pFirstTs);
taosArrayDestroy(pLoadInfo->info.pLastTs);
@ -319,6 +326,21 @@ static int32_t extractSttBlockInfo(SLDataIter *pIter, const TSttBlkArray *pArray
return TSDB_CODE_SUCCESS;
}
static int32_t tValueDupPayload(SValue *pVal) {
if (IS_VAR_DATA_TYPE(pVal->type)) {
char *p = (char *)pVal->pData;
char *pBuf = taosMemoryMalloc(pVal->nData);
if (pBuf == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
memcpy(pBuf, p, pVal->nData);
pVal->pData = (uint8_t *)pBuf;
}
return TSDB_CODE_SUCCESS;
}
static int32_t loadSttStatisticsBlockData(SSttFileReader *pSttFileReader, SSttBlockLoadInfo *pBlockLoadInfo,
TStatisBlkArray *pStatisBlkArray, uint64_t suid, const char *id) {
int32_t numOfBlocks = TARRAY2_SIZE(pStatisBlkArray);
@ -377,37 +399,31 @@ static int32_t loadSttStatisticsBlockData(SSttFileReader *pSttFileReader, SSttBl
rows - i);
taosArrayAddBatch(pBlockLoadInfo->info.pCount, tBufferGetDataAt(&block.counts, i * sizeof(int64_t)), rows - i);
SValue vFirst = {0}, vLast = {0};
for (int32_t f = i; f < rows; ++f) {
int32_t code = tValueColumnGet(&block.firstKeyPKs[0], f, &vFirst);
if (code) {
break;
}
if (block.numOfPKs > 0) {
SValue vFirst = {0}, vLast = {0};
for (int32_t f = i; f < rows; ++f) {
int32_t code = tValueColumnGet(&block.firstKeyPKs[0], f, &vFirst);
if (code) {
break;
}
if (IS_VAR_DATA_TYPE(vFirst.type)) {
char *p = (char *)vFirst.pData;
char *pBuf = taosMemoryMalloc(vFirst.nData);
memcpy(pBuf, p, vFirst.nData);
vFirst.pData = (uint8_t *)pBuf;
}
taosArrayPush(pBlockLoadInfo->info.pFirstKey, &vFirst);
tValueDupPayload(&vFirst);
taosArrayPush(pBlockLoadInfo->info.pFirstKey, &vFirst);
code = tValueColumnGet(&block.lastKeyPKs[0], f, &vLast);
if (code) {
break;
}
// todo add api to clone the original data
code = tValueColumnGet(&block.lastKeyPKs[0], f, &vLast);
if (code) {
break;
}
if (IS_VAR_DATA_TYPE(vLast.type)) {
char *p = (char *)vLast.pData;
char *pBuf = taosMemoryMalloc(vLast.nData);
memcpy(pBuf, p, vLast.nData);
vLast.pData = (uint8_t *)pBuf;
tValueDupPayload(&vLast);
taosArrayPush(pBlockLoadInfo->info.pLastKey, &vLast);
}
taosArrayPush(pBlockLoadInfo->info.pLastKey, &vLast);
}
} else {
STbStatisRecord record;
STbStatisRecord record = {0};
while (i < rows) {
tStatisBlockGet(&block, i, &record);
if (record.suid != suid) {
@ -420,8 +436,18 @@ static int32_t loadSttStatisticsBlockData(SSttFileReader *pSttFileReader, SSttBl
taosArrayPush(pBlockLoadInfo->info.pFirstTs, &record.firstKey.ts);
taosArrayPush(pBlockLoadInfo->info.pLastTs, &record.lastKey.ts);
taosArrayPush(pBlockLoadInfo->info.pFirstKey, &record.firstKey.pks[0]);
taosArrayPush(pBlockLoadInfo->info.pLastKey, &record.lastKey.pks[0]);
if (record.firstKey.numOfPKs > 0) {
SValue s = record.firstKey.pks[0];
tValueDupPayload(&s);
taosArrayPush(pBlockLoadInfo->info.pFirstKey, &s);
s = record.lastKey.pks[0];
tValueDupPayload(&s);
taosArrayPush(pBlockLoadInfo->info.pLastKey, &s);
}
i += 1;
}
}

View File

@ -59,7 +59,7 @@ static int32_t doAppendRowFromFileBlock(SSDataBlock* pResBlock, STsdbReader* pR
int32_t rowIndex);
static void setComposedBlockFlag(STsdbReader* pReader, bool composed);
static bool hasBeenDropped(const SArray* pDelList, int32_t* index, int64_t key, int64_t ver, int32_t order,
SVersionRange* pVerRange);
SVersionRange* pVerRange, bool hasPk);
static int32_t doMergeMemTableMultiRows(TSDBROW* pRow, SRowKey* pKey, uint64_t uid, SIterInfo* pIter, SArray* pDelList,
TSDBROW* pResRow, STsdbReader* pReader, bool* freeTSRow);
@ -67,8 +67,7 @@ static int32_t doMergeMemIMemRows(TSDBROW* pRow, SRowKey* pRowKey, TSDBROW* piRo
STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, SRow** pTSRow);
static int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBlockScanInfo, SRowKey* pKey,
STsdbReader* pReader);
static int32_t mergeRowsInSttBlocks(SSttBlockReader* pSttBlockReader, STableBlockScanInfo* pBlockScanInfo,
STsdbReader* pReader);
static int32_t mergeRowsInSttBlocks(SSttBlockReader* pSttBlockReader, STableBlockScanInfo* pScanInfo, STsdbReader* pReader);
static int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, int32_t order, SReadCostSummary* pCost);
static STsdb* getTsdbByRetentions(SVnode* pVnode, SQueryTableDataCond* pCond, SRetention* retentions, const char* idstr,
@ -174,12 +173,17 @@ static void tRowGetKeyDeepCopy(SRow* pRow, SRowKey* pKey) {
for (int32_t i = 0; i < pRow->numOfPKs; i++) {
pKey->pks[i].type = indices[i].type;
uint8_t *tdata = data + indices[i].offset;
if (pRow->flag >> 4) {
tdata += tGetI16v(tdata, NULL);
}
if (IS_VAR_DATA_TYPE(indices[i].type)) {
tGetU32v(pKey->pks[i].pData, &pKey->pks[i].nData);
pKey->pks[i].pData = memcpy(pKey->pks[i].pData, data + indices[i].offset, pKey->pks[i].nData);
pKey->pks[i].pData = memcpy(pKey->pks[i].pData, tdata, pKey->pks[i].nData);
pKey->pks[i].pData += pKey->pks[i].nData;
} else {
pKey->pks[i].val = *(int64_t*) (data + indices[i].offset);
memcpy(&pKey->pks[i].val, data + indices[i].offset, tDataTypes[pKey->pks[i].type].bytes);
}
}
}
@ -392,19 +396,17 @@ _err:
return code;
}
static void resetDataBlockIterator(SDataBlockIter* pIter, int32_t order) {
void resetDataBlockIterator(SDataBlockIter* pIter, int32_t order, bool hasPk) {
pIter->order = order;
pIter->index = -1;
pIter->numOfBlocks = 0;
if (pIter->blockList == NULL) {
pIter->blockList = taosArrayInit(4, sizeof(SFileDataBlockInfo));
} else {
taosArrayClear(pIter->blockList);
clearDataBlockIterator(pIter, hasPk);
}
}
static void cleanupDataBlockIterator(SDataBlockIter* pIter) { taosArrayDestroy(pIter->blockList); }
static void initReaderStatus(SReaderStatus* pStatus) {
pStatus->pTableIter = NULL;
pStatus->loadFromFile = true;
@ -657,21 +659,19 @@ _end:
return code;
}
static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockNumber* pBlockNum,
SArray* pTableScanInfoList) {
size_t sizeInDisk = 0;
int64_t st = taosGetTimestampUs();
static int32_t loadFileBlockBrinInfo(STsdbReader* pReader, SArray* pIndexList, SBlockNumber* pBlockNum,
SArray* pTableScanInfoList) {
int32_t k = 0;
size_t sizeInDisk = 0;
int64_t st = taosGetTimestampUs();
bool asc = ASCENDING_TRAVERSE(pReader->info.order);
STimeWindow w = pReader->info.window;
SBrinRecord* pRecord = NULL;
int32_t numOfTables = tSimpleHashGetSize(pReader->status.pTableMap);
SBrinRecordIter iter = {0};
// clear info for the new file
cleanupInfoForNextFileset(pReader->status.pTableMap);
int32_t k = 0;
int32_t numOfTables = tSimpleHashGetSize(pReader->status.pTableMap);
bool asc = ASCENDING_TRAVERSE(pReader->info.order);
STimeWindow w = pReader->info.window;
SBrinRecord* pRecord = NULL;
SBrinRecordIter iter = {0};
initBrinRecordIter(&iter, pReader->pFileReader, pIndexList);
while (((pRecord = getNextBrinRecord(&iter)) != NULL)) {
@ -743,14 +743,27 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockN
}
if (pScanInfo->pBlockList == NULL) {
pScanInfo->pBlockList = taosArrayInit(4, sizeof(SBrinRecord));
pScanInfo->pBlockList = taosArrayInit(4, sizeof(SFileDataBlockInfo));
if (pScanInfo->pBlockList == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
}
void* p1 = taosArrayPush(pScanInfo->pBlockList, pRecord);
if (pScanInfo->pBlockIdxList == NULL) {
pScanInfo->pBlockIdxList = taosArrayInit(4, sizeof(STableDataBlockIdx));
if (pScanInfo->pBlockIdxList == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
}
SFileDataBlockInfo blockInfo = {.tbBlockIdx = TARRAY_SIZE(pScanInfo->pBlockList)};
recordToBlockInfo(&blockInfo, pRecord);
void* p1 = taosArrayPush(pScanInfo->pBlockList, &blockInfo);
if (p1 == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
// todo: refactor to record the fileset skey/ekey
if (pScanInfo->filesetWindow.skey > pRecord->firstKey.key.ts) {
pScanInfo->filesetWindow.skey = pRecord->firstKey.key.ts;
}
@ -1323,10 +1336,12 @@ static int32_t dataBlockPartiallyRequired(STimeWindow* pWindow, SVersionRange* p
}
static bool getNeighborBlockOfSameTable(SDataBlockIter* pBlockIter, SFileDataBlockInfo* pBlockInfo,
STableBlockScanInfo* pTableBlockScanInfo, int32_t* nextIndex, int32_t order,
STableBlockScanInfo* pScanInfo, int32_t* nextIndex, int32_t order,
SBrinRecord* pRecord) {
bool asc = ASCENDING_TRAVERSE(order);
if (asc && pBlockInfo->tbBlockIdx >= taosArrayGetSize(pTableBlockScanInfo->pBlockIdxList) - 1) {
bool asc = ASCENDING_TRAVERSE(order);
int32_t step = asc ? 1 : -1;
if (asc && pBlockInfo->tbBlockIdx >= taosArrayGetSize(pScanInfo->pBlockIdxList) - 1) {
return false;
}
@ -1334,9 +1349,7 @@ static bool getNeighborBlockOfSameTable(SDataBlockIter* pBlockIter, SFileDataBlo
return false;
}
int32_t step = asc ? 1 : -1;
STableDataBlockIdx* pTableDataBlockIdx =
taosArrayGet(pTableBlockScanInfo->pBlockIdxList, pBlockInfo->tbBlockIdx + step);
STableDataBlockIdx* pTableDataBlockIdx = taosArrayGet(pScanInfo->pBlockIdxList, pBlockInfo->tbBlockIdx + step);
SFileDataBlockInfo* p = taosArrayGet(pBlockIter->blockList, pTableDataBlockIdx->globalIndex);
blockInfoToRecord(pRecord, p);
@ -1344,22 +1357,6 @@ static bool getNeighborBlockOfSameTable(SDataBlockIter* pBlockIter, SFileDataBlo
return true;
}
static int32_t findFileBlockInfoIndex(SDataBlockIter* pBlockIter, SFileDataBlockInfo* pFBlockInfo) {
int32_t step = ASCENDING_TRAVERSE(pBlockIter->order) ? 1 : -1;
int32_t index = pBlockIter->index;
while (index < pBlockIter->numOfBlocks && index >= 0) {
SFileDataBlockInfo* pFBlock = taosArrayGet(pBlockIter->blockList, index);
if (pFBlock->uid == pFBlockInfo->uid && pFBlock->tbBlockIdx == pFBlockInfo->tbBlockIdx) {
return index;
}
index += step;
}
return -1;
}
static int32_t setFileBlockActiveInBlockIter(STsdbReader* pReader, SDataBlockIter* pBlockIter, int32_t index,
int32_t step) {
if (index < 0 || index >= pBlockIter->numOfBlocks) {
@ -1595,7 +1592,8 @@ static bool nextRowFromSttBlocks(SSttBlockReader* pSttBlockReader, STableBlockSc
tColRowGetKeyDeepCopy(pRow->pBlockData, pRow->iRow, pkSrcSlot, pNextProc);
if (pScanInfo->delSkyline != NULL && TARRAY_SIZE(pScanInfo->delSkyline) > 0) {
if (!hasBeenDropped(pScanInfo->delSkyline, &pScanInfo->sttBlockDelIndex, key, ver, order, pVerRange)) {
if (!hasBeenDropped(pScanInfo->delSkyline, &pScanInfo->sttBlockDelIndex, key, ver, order, pVerRange,
pSttBlockReader->numOfPks > 0)) {
pScanInfo->sttKeyInfo.status = STT_FILE_HAS_DATA;
return true;
}
@ -2135,7 +2133,7 @@ static bool isValidFileBlockRow(SBlockData* pBlockData, int32_t rowIndex, STable
if (pBlockScanInfo->delSkyline != NULL && TARRAY_SIZE(pBlockScanInfo->delSkyline) > 0) {
bool dropped = hasBeenDropped(pBlockScanInfo->delSkyline, &pBlockScanInfo->fileDelIndex, ts, ver, pInfo->order,
&pInfo->verRange);
&pInfo->verRange, pReader->suppInfo.numOfPks > 0);
if (dropped) {
return false;
}
@ -2705,7 +2703,7 @@ static int32_t moveToNextFile(STsdbReader* pReader, SBlockNumber* pBlockNum, SAr
}
if (taosArrayGetSize(pIndexList) > 0 || pReader->status.pCurrentFileset->lvlArr->size > 0) {
code = doLoadFileBlock(pReader, pIndexList, pBlockNum, pTableList);
code = loadFileBlockBrinInfo(pReader, pIndexList, pBlockNum, pTableList);
if (code != TSDB_CODE_SUCCESS) {
taosArrayDestroy(pIndexList);
return code;
@ -3153,23 +3151,14 @@ static void initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter)
SFileBlockDumpInfo* pDumpInfo = &pStatus->fBlockDumpInfo;
if (pBlockInfo) {
// todo handle
// STableBlockScanInfo* pScanInfo = tSimpleHashGet(pBlockIter->pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
// if (pScanInfo) {
// tsdbRowKeyAssign(&pDumpInfo->lastKey, &pScanInfo->lastProcKey);
// lastKey = pScanInfo->lastProcKey;
// }
pDumpInfo->totalRows = pBlockInfo->numRow;
pDumpInfo->rowIndex = ASCENDING_TRAVERSE(pReader->info.order) ? 0 : pBlockInfo->numRow - 1;
} else {
pDumpInfo->totalRows = 0;
pDumpInfo->rowIndex = 0;
// pDumpInfo->lastKey.key.ts = lastKey;
}
pDumpInfo->allDumped = false;
// pDumpInfo->lastKey = lastKey;
}
static int32_t initForFirstBlockInFile(STsdbReader* pReader, SDataBlockIter* pBlockIter) {
@ -3194,7 +3183,7 @@ static int32_t initForFirstBlockInFile(STsdbReader* pReader, SDataBlockIter* pBl
code = initBlockIterator(pReader, pBlockIter, num.numOfBlocks, pTableList);
} else { // no block data, only last block exists
tBlockDataReset(&pReader->status.fileBlockData);
resetDataBlockIterator(pBlockIter, pReader->info.order);
resetDataBlockIterator(pBlockIter, pReader->info.order, pReader->suppInfo.numOfPks > 0);
resetTableListIndex(&pReader->status);
}
@ -3304,7 +3293,7 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) {
}
tBlockDataReset(pBlockData);
resetDataBlockIterator(pBlockIter, pReader->info.order);
resetDataBlockIterator(pBlockIter, pReader->info.order, pReader->suppInfo.numOfPks > 0);
resetTableListIndex(&pReader->status);
ERetrieveType type = doReadDataFromSttFiles(pReader);
@ -3381,8 +3370,35 @@ SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond, int8_
return (SVersionRange){.minVer = startVer, .maxVer = endVer};
}
static int32_t reverseSearchStartPos(const SArray* pDelList, int32_t index, int64_t key, bool asc) {
size_t num = taosArrayGetSize(pDelList);
int32_t start = index;
if (asc) {
if (start >= num - 1) {
start = num - 1;
}
TSDBKEY* p = taosArrayGet(pDelList, start);
while (p->ts >= key && start > 0) {
start -= 1;
}
} else {
if (index <= 0) {
start = 0;
}
TSDBKEY* p = taosArrayGet(pDelList, start);
while (p->ts <= key && start < num - 1) {
start += 1;
}
}
return start;
}
bool hasBeenDropped(const SArray* pDelList, int32_t* index, int64_t key, int64_t ver, int32_t order,
SVersionRange* pVerRange) {
SVersionRange* pVerRange, bool hasPk) {
if (pDelList == NULL || (TARRAY_SIZE(pDelList) == 0)) {
return false;
}
@ -3391,6 +3407,10 @@ bool hasBeenDropped(const SArray* pDelList, int32_t* index, int64_t key, int64_t
bool asc = ASCENDING_TRAVERSE(order);
int32_t step = asc ? 1 : -1;
if (hasPk) { // handle the case where duplicated timestamps existed.
*index = reverseSearchStartPos(pDelList, *index, key, asc);
}
if (asc) {
if (*index >= num - 1) {
TSDBKEY* last = taosArrayGetLast(pDelList);
@ -3503,7 +3523,7 @@ TSDBROW* getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* p
if (pDelList == NULL || TARRAY_SIZE(pDelList) == 0) {
return pRow;
} else {
bool dropped = hasBeenDropped(pDelList, &pIter->index, key.ts, key.version, order, &pReader->info.verRange);
bool dropped = hasBeenDropped(pDelList, &pIter->index, key.ts, key.version, order, &pReader->info.verRange, pReader->suppInfo.numOfPks > 0);
if (!dropped) {
return pRow;
}
@ -3528,7 +3548,7 @@ TSDBROW* getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* p
if (pDelList == NULL || TARRAY_SIZE(pDelList) == 0) {
return pRow;
} else {
bool dropped = hasBeenDropped(pDelList, &pIter->index, key.ts, key.version, order, &pReader->info.verRange);
bool dropped = hasBeenDropped(pDelList, &pIter->index, key.ts, key.version, order, &pReader->info.verRange, pReader->suppInfo.numOfPks > 0);
if (!dropped) {
return pRow;
}
@ -4118,7 +4138,7 @@ static int32_t doOpenReaderImpl(STsdbReader* pReader) {
}
initFilesetIterator(&pStatus->fileIter, pReader->pReadSnap->pfSetArray, pReader);
resetDataBlockIterator(&pStatus->blockIter, pReader->info.order);
resetDataBlockIterator(&pStatus->blockIter, pReader->info.order, pReader->suppInfo.numOfPks > 0);
int32_t code = TSDB_CODE_SUCCESS;
if (pStatus->fileIter.numOfFiles == 0) {
@ -4322,7 +4342,7 @@ void tsdbReaderClose2(STsdbReader* pReader) {
taosMemoryFree(pSupInfo->colId);
tBlockDataDestroy(&pReader->status.fileBlockData);
cleanupDataBlockIterator(&pReader->status.blockIter);
cleanupDataBlockIterator(&pReader->status.blockIter, pReader->suppInfo.numOfPks > 0);
size_t numOfTables = tSimpleHashGetSize(pReader->status.pTableMap);
if (pReader->status.pTableMap != NULL) {
@ -4338,9 +4358,11 @@ void tsdbReaderClose2(STsdbReader* pReader) {
SReadCostSummary* pCost = &pReader->cost;
SFilesetIter* pFilesetIter = &pReader->status.fileIter;
if (pFilesetIter->pSttBlockReader != NULL) {
SSttBlockReader* pLReader = pFilesetIter->pSttBlockReader;
tMergeTreeClose(&pLReader->mergeTree);
taosMemoryFree(pLReader);
SSttBlockReader* pSttBlockReader = pFilesetIter->pSttBlockReader;
tMergeTreeClose(&pSttBlockReader->mergeTree);
clearRowKey(&pSttBlockReader->currentKey);
taosMemoryFree(pSttBlockReader);
}
destroySttBlockReader(pReader->status.pLDataIterArray, &pCost->sttCost);
@ -4996,7 +5018,7 @@ int32_t tsdbReaderReset2(STsdbReader* pReader, SQueryTableDataCond* pCond) {
int32_t numOfTables = tSimpleHashGetSize(pStatus->pTableMap);
initFilesetIterator(&pStatus->fileIter, pReader->pReadSnap->pfSetArray, pReader);
resetDataBlockIterator(pBlockIter, pReader->info.order);
resetDataBlockIterator(pBlockIter, pReader->info.order, pReader->suppInfo.numOfPks > 0);
resetTableListIndex(&pReader->status);
bool asc = ASCENDING_TRAVERSE(pReader->info.order);

View File

@ -167,6 +167,13 @@ int32_t initRowKey(SRowKey* pKey, int64_t ts, int32_t numOfPks, int32_t type, in
return TSDB_CODE_SUCCESS;
}
void clearRowKey(SRowKey* pKey) {
if (pKey == NULL || pKey->numOfPKs == 0 || (!IS_VAR_DATA_TYPE(pKey->pks[0].type))) {
return;
}
taosMemoryFreeClear(pKey->pks[0].pData);
}
static void initLastProcKey(STableBlockScanInfo *pScanInfo, STsdbReader* pReader) {
int32_t numOfPks = pReader->suppInfo.numOfPks;
bool asc = ASCENDING_TRAVERSE(pReader->info.order);
@ -293,6 +300,11 @@ void clearBlockScanInfo(STableBlockScanInfo* p) {
p->pBlockIdxList = taosArrayDestroy(p->pBlockIdxList);
p->pMemDelData = taosArrayDestroy(p->pMemDelData);
p->pFileDelData = taosArrayDestroy(p->pFileDelData);
clearRowKey(&p->lastProcKey);
clearRowKey(&p->sttRange.skey);
clearRowKey(&p->sttRange.ekey);
clearRowKey(&p->sttKeyInfo.nextProcKey);
}
void destroyAllBlockScanInfo(SSHashObj* pTableMap) {
@ -415,7 +427,7 @@ static int32_t fileDataBlockOrderCompar(const void* pLeft, const void* pRight, v
return pLeftBlock->offset > pRightBlock->offset ? 1 : -1;
}
static void recordToBlockInfo(SFileDataBlockInfo* pBlockInfo, SBrinRecord* record, STsdbReader* pReader) {
void recordToBlockInfo(SFileDataBlockInfo* pBlockInfo, SBrinRecord* record) {
pBlockInfo->uid = record->uid;
pBlockInfo->firstKey = record->firstKey.key.ts;
pBlockInfo->lastKey = record->lastKey.key.ts;
@ -449,12 +461,40 @@ static void recordToBlockInfo(SFileDataBlockInfo* pBlockInfo, SBrinRecord* recor
}
}
static void freePkItem(void* pItem) {
SFileDataBlockInfo* p = pItem;
taosMemoryFreeClear(p->firstPk.pData);
taosMemoryFreeClear(p->lastPk.pData);
}
void clearDataBlockIterator(SDataBlockIter* pIter, bool hasPk) {
pIter->index = -1;
pIter->numOfBlocks = 0;
if (hasPk) {
taosArrayClearEx(pIter->blockList, freePkItem);
} else {
taosArrayClear(pIter->blockList);
}
}
void cleanupDataBlockIterator(SDataBlockIter* pIter, bool hasPk) {
pIter->index = -1;
pIter->numOfBlocks = 0;
if (hasPk) {
taosArrayDestroyEx(pIter->blockList, freePkItem);
} else {
taosArrayDestroy(pIter->blockList);
}
}
int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIter, int32_t numOfBlocks, SArray* pTableList) {
bool asc = ASCENDING_TRAVERSE(pReader->info.order);
SBlockOrderSupporter sup = {0};
clearDataBlockIterator(pBlockIter, pReader->suppInfo.numOfPks > 0);
pBlockIter->numOfBlocks = numOfBlocks;
taosArrayClear(pBlockIter->blockList);
// access data blocks according to the offset of each block in asc/desc order.
int32_t numOfTables = taosArrayGetSize(pTableList);
@ -482,9 +522,9 @@ int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIter, int3
sup.pDataBlockInfo[sup.numOfTables] = (SBlockOrderWrapper*)buf;
for (int32_t k = 0; k < num; ++k) {
SBrinRecord* pRecord = taosArrayGet(pTableScanInfo->pBlockList, k);
SFileDataBlockInfo* pBlockInfo = taosArrayGet(pTableScanInfo->pBlockList, k);
sup.pDataBlockInfo[sup.numOfTables][k] =
(SBlockOrderWrapper){.uid = pTableScanInfo->uid, .offset = pRecord->blockOffset, .pInfo = pTableScanInfo};
(SBlockOrderWrapper){.uid = pTableScanInfo->uid, .offset = pBlockInfo->blockOffset, .pInfo = pTableScanInfo};
cnt++;
}
@ -499,20 +539,12 @@ int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIter, int3
// since there is only one table qualified, blocks are not sorted
if (sup.numOfTables == 1) {
STableBlockScanInfo* pTableScanInfo = taosArrayGetP(pTableList, 0);
if (pTableScanInfo->pBlockIdxList == NULL) {
pTableScanInfo->pBlockIdxList = taosArrayInit(numOfBlocks, sizeof(STableDataBlockIdx));
}
for (int32_t i = 0; i < numOfBlocks; ++i) {
SFileDataBlockInfo blockInfo = {.tbBlockIdx = i};
SBrinRecord* record = (SBrinRecord*)taosArrayGet(sup.pDataBlockInfo[0][i].pInfo->pBlockList, i);
recordToBlockInfo(&blockInfo, record, pReader);
taosArrayPush(pBlockIter->blockList, &blockInfo);
STableDataBlockIdx tableDataBlockIdx = {.globalIndex = i};
taosArrayPush(pTableScanInfo->pBlockIdxList, &tableDataBlockIdx);
}
taosArrayAddAll(pBlockIter->blockList, pTableScanInfo->pBlockList);
pTableScanInfo->pBlockList = taosArrayDestroy(pTableScanInfo->pBlockList);
int64_t et = taosGetTimestampUs();
@ -540,18 +572,13 @@ int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIter, int3
int32_t pos = tMergeTreeGetChosenIndex(pTree);
int32_t index = sup.indexPerTable[pos]++;
SFileDataBlockInfo blockInfo = {.tbBlockIdx = index};
SBrinRecord* record = (SBrinRecord*)taosArrayGet(sup.pDataBlockInfo[pos][index].pInfo->pBlockList, index);
recordToBlockInfo(&blockInfo, record, pReader);
SFileDataBlockInfo* pBlockInfo = taosArrayGet(sup.pDataBlockInfo[pos][index].pInfo->pBlockList, index);
taosArrayPush(pBlockIter->blockList, pBlockInfo);
taosArrayPush(pBlockIter->blockList, &blockInfo);
STableBlockScanInfo* pTableScanInfo = sup.pDataBlockInfo[pos][index].pInfo;
if (pTableScanInfo->pBlockIdxList == NULL) {
size_t szTableDataBlocks = taosArrayGetSize(pTableScanInfo->pBlockList);
pTableScanInfo->pBlockIdxList = taosArrayInit(szTableDataBlocks, sizeof(STableDataBlockIdx));
}
STableDataBlockIdx tableDataBlockIdx = {.globalIndex = numOfTotal};
STableDataBlockIdx tableDataBlockIdx = {.globalIndex = numOfTotal};
taosArrayPush(pTableScanInfo->pBlockIdxList, &tableDataBlockIdx);
// set data block index overflow, in order to disable the offset comparator
if (sup.indexPerTable[pos] >= sup.numOfBlocksPerTable[pos]) {
sup.indexPerTable[pos] = sup.numOfBlocksPerTable[pos] + 1;

View File

@ -237,7 +237,6 @@ typedef struct SDataBlockIter {
typedef struct SFileBlockDumpInfo {
int32_t totalRows;
int32_t rowIndex;
// int64_t lastKey;
// STsdbRowKey lastKey; // this key should be removed
bool allDumped;
} SFileBlockDumpInfo;
@ -338,6 +337,7 @@ int32_t loadSttTombDataForAll(STsdbReader* pReader, SSttFileReader* pSttFileRead
int32_t getNumOfRowsInSttBlock(SSttFileReader* pSttFileReader, SSttBlockLoadInfo* pBlockLoadInfo,
TStatisBlkArray* pStatisBlkArray, uint64_t suid, const uint64_t* pUidList,
int32_t numOfTables);
void recordToBlockInfo(SFileDataBlockInfo* pBlockInfo, SBrinRecord* record);
void destroyLDataIter(SLDataIter* pIter);
int32_t adjustSttDataIters(SArray* pSttFileBlockIterArray, STFileSet* pFileSet);
@ -347,6 +347,11 @@ bool isCleanSttBlock(SArray* pTimewindowList, STimeWindow* pQueryWindow, STab
bool overlapWithDelSkyline(STableBlockScanInfo* pBlockScanInfo, const SBrinRecord* pRecord, int32_t order);
int32_t pkCompEx(__compar_fn_t comparFn, SRowKey* p1, SRowKey* p2);
int32_t initRowKey(SRowKey* pKey, int64_t ts, int32_t numOfPks, int32_t type, int32_t len, bool asc);
void clearRowKey(SRowKey* pKey);
void resetDataBlockIterator(SDataBlockIter* pIter, int32_t order, bool hasPk);
void clearDataBlockIterator(SDataBlockIter* pIter, bool hasPk);
void cleanupDataBlockIterator(SDataBlockIter* pIter, bool hasPk);
typedef struct {
SArray* pTombData;
@ -382,6 +387,7 @@ typedef struct SCacheRowsReader {
SArray* pFuncTypeList;
__compar_fn_t pkComparFn;
SRowKey rowKey;
SColumnInfo pkColumn;
} SCacheRowsReader;
int32_t tsdbCacheGetBatch(STsdb* pTsdb, tb_uid_t uid, SArray* pLastArray, SCacheRowsReader* pr, int8_t ltype);

View File

@ -909,7 +909,7 @@ int32_t initBasicInfoEx(SOptrBasicInfo* pBasicInfo, SExprSupp* pSup, SExprInfo*
int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SExprSupp* pExpSup, int32_t numOfOutput, int64_t gap,
SStreamState* pState, int32_t keySize, int16_t keyType, SStateStore* pStore,
SReadHandle* pHandle, STimeWindowAggSupp* pTwAggSup, const char* taskIdStr,
SStorageAPI* pApi);
SStorageAPI* pApi, int32_t tsIndex);
void initDownStream(struct SOperatorInfo* downstream, SStreamAggSupporter* pAggSup, uint16_t type, int32_t tsColIndex,
STimeWindowAggSupp* pTwSup);
void getMaxTsWins(const SArray* pAllWins, SArray* pMaxWins);

View File

@ -40,7 +40,7 @@ typedef struct SCacheRowsScanInfo {
SExprSupp pseudoExprSup;
int32_t retrieveType;
int32_t currentGroupIndex;
SSDataBlock* pBufferredRes;
SSDataBlock* pBufferedRes;
SArray* pUidList;
SArray* pCidList;
int32_t indexOfBufferedRes;
@ -160,9 +160,9 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe
capacity = TMIN(totalTables, 4096);
pInfo->pBufferredRes = createOneDataBlock(pInfo->pRes, false);
setColIdForCacheReadBlock(pInfo->pBufferredRes, pScanNode);
blockDataEnsureCapacity(pInfo->pBufferredRes, capacity);
pInfo->pBufferedRes = createOneDataBlock(pInfo->pRes, false);
setColIdForCacheReadBlock(pInfo->pBufferedRes, pScanNode);
blockDataEnsureCapacity(pInfo->pBufferedRes, capacity);
} else { // by tags
pInfo->retrieveType = CACHESCAN_RETRIEVE_TYPE_SINGLE | SCAN_ROW_TYPE(pScanNode->ignoreNull);
capacity = 1; // only one row output
@ -219,18 +219,18 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
}
if (pInfo->indexOfBufferedRes >= pInfo->pBufferredRes->info.rows) {
blockDataCleanup(pInfo->pBufferredRes);
if (pInfo->indexOfBufferedRes >= pInfo->pBufferedRes->info.rows) {
blockDataCleanup(pInfo->pBufferedRes);
taosArrayClear(pInfo->pUidList);
int32_t code = pInfo->readHandle.api.cacheFn.retrieveRows(pInfo->pLastrowReader, pInfo->pBufferredRes, pInfo->pSlotIds,
int32_t code = pInfo->readHandle.api.cacheFn.retrieveRows(pInfo->pLastrowReader, pInfo->pBufferedRes, pInfo->pSlotIds,
pInfo->pDstSlotIds, pInfo->pUidList);
if (code != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, code);
}
// check for tag values
int32_t resultRows = pInfo->pBufferredRes->info.rows;
int32_t resultRows = pInfo->pBufferedRes->info.rows;
// the results may be null, if last values are all null
ASSERT(resultRows == 0 || resultRows == taosArrayGetSize(pInfo->pUidList));
@ -239,12 +239,12 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
SSDataBlock* pRes = pInfo->pRes;
if (pInfo->indexOfBufferedRes < pInfo->pBufferredRes->info.rows) {
for (int32_t i = 0; i < taosArrayGetSize(pInfo->pBufferredRes->pDataBlock); ++i) {
if (pInfo->indexOfBufferedRes < pInfo->pBufferedRes->info.rows) {
for (int32_t i = 0; i < taosArrayGetSize(pInfo->pBufferedRes->pDataBlock); ++i) {
SColumnInfoData* pCol = taosArrayGet(pRes->pDataBlock, i);
int32_t slotId = pCol->info.slotId;
SColumnInfoData* pSrc = taosArrayGet(pInfo->pBufferredRes->pDataBlock, slotId);
SColumnInfoData* pSrc = taosArrayGet(pInfo->pBufferedRes->pDataBlock, slotId);
SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, slotId);
if (colDataIsNull_s(pSrc, pInfo->indexOfBufferedRes)) {
@ -350,7 +350,7 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
void destroyCacheScanOperator(void* param) {
SCacheRowsScanInfo* pInfo = (SCacheRowsScanInfo*)param;
blockDataDestroy(pInfo->pRes);
blockDataDestroy(pInfo->pBufferredRes);
blockDataDestroy(pInfo->pBufferedRes);
taosMemoryFree(pInfo->pSlotIds);
taosMemoryFree(pInfo->pDstSlotIds);
taosArrayDestroy(pInfo->pCidList);

View File

@ -216,11 +216,6 @@ int32_t buildSubmitReqFromBlock(SDataInserterHandle* pInserter, SSubmitReq2** pp
case TSDB_DATA_TYPE_VARCHAR: { // TSDB_DATA_TYPE_BINARY
ASSERT(pColInfoData->info.type == pCol->type);
if (colDataIsNull_s(pColInfoData, j)) {
if ((pCol->flags & COL_IS_KEY)) {
qError("Primary key column should not be null, colId:%" PRIi16 ", colType:%" PRIi8, pCol->colId, pCol->type);
terrno = TSDB_CODE_PAR_PRIMARY_KEY_IS_NULL;
goto _end;
}
SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type);
taosArrayPush(pVals, &cv);
} else {
@ -248,11 +243,6 @@ int32_t buildSubmitReqFromBlock(SDataInserterHandle* pInserter, SSubmitReq2** pp
terrno = TSDB_CODE_PAR_INCORRECT_TIMESTAMP_VAL;
goto _end;
}
if ((pCol->flags & COL_IS_KEY)) {
qError("Primary key column should not be null, colId:%" PRIi16 ", colType:%" PRIi8, pCol->colId, pCol->type);
terrno = TSDB_CODE_PAR_PRIMARY_KEY_IS_NULL;
goto _end;
}
SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type); // should use pCol->type
taosArrayPush(pVals, &cv);

View File

@ -255,6 +255,7 @@ int32_t prepareDataBlockBuf(SSDataBlock* pDataBlock, SColMatchInfo* pMatchInfo)
for (int32_t i = 0; i < taosArrayGetSize(pMatchInfo->pList); ++i) {
SColMatchItem* pItem = taosArrayGet(pMatchInfo->pList, i);
if (pItem->isPk) {
SColumnInfoData* pInfoData = taosArrayGet(pDataBlock->pDataBlock, pItem->dstSlotId);
pBlockInfo->pks[0].type = pInfoData->info.type;
@ -271,6 +272,9 @@ int32_t prepareDataBlockBuf(SSDataBlock* pDataBlock, SColMatchInfo* pMatchInfo)
taosMemoryFreeClear(pBlockInfo->pks[0].pData);
return TSDB_CODE_OUT_OF_MEMORY;
}
pBlockInfo->pks[0].nData = pInfoData->info.bytes;
pBlockInfo->pks[1].nData = pInfoData->info.bytes;
}
}
}

View File

@ -3156,7 +3156,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
pInfo->pUpdateInfo = NULL;
pInfo->pTableScanOp = pTableScanOp;
if (pInfo->pTableScanOp->pTaskInfo->streamInfo.pState) {
pAPI->stateStore.streamStateSetNumber(pInfo->pTableScanOp->pTaskInfo->streamInfo.pState, -1);
pAPI->stateStore.streamStateSetNumber(pInfo->pTableScanOp->pTaskInfo->streamInfo.pState, -1, pInfo->primaryTsIndex);
}
pInfo->readHandle = *pHandle;

View File

@ -671,9 +671,10 @@ SOperatorInfo* createStreamCountAggOperatorInfo(SOperatorInfo* downstream, SPhys
goto _error;
}
pInfo->primaryTsIndex = ((SColumnNode*)pCountNode->window.pTspk)->slotId;
code = initStreamAggSupporter(&pInfo->streamAggSup, pExpSup, numOfCols, 0,
pTaskInfo->streamInfo.pState, sizeof(COUNT_TYPE), 0, &pTaskInfo->storageAPI.stateStore, pHandle,
&pInfo->twAggSup, GET_TASKID(pTaskInfo), &pTaskInfo->storageAPI);
&pInfo->twAggSup, GET_TASKID(pTaskInfo), &pTaskInfo->storageAPI, pInfo->primaryTsIndex);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
@ -689,8 +690,6 @@ SOperatorInfo* createStreamCountAggOperatorInfo(SOperatorInfo* downstream, SPhys
initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
pInfo->primaryTsIndex = ((SColumnNode*)pCountNode->window.pTspk)->slotId;
pInfo->binfo.pRes = pResBlock;
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
pInfo->pStDeleted = tSimpleHashInit(64, hashFn);

View File

@ -722,14 +722,14 @@ SOperatorInfo* createStreamEventAggOperatorInfo(SOperatorInfo* downstream, SPhys
goto _error;
}
pInfo->primaryTsIndex = tsSlotId;
code = initStreamAggSupporter(&pInfo->streamAggSup, pExpSup, numOfCols, 0, pTaskInfo->streamInfo.pState,
sizeof(bool) + sizeof(bool), 0, &pTaskInfo->storageAPI.stateStore, pHandle,
&pInfo->twAggSup, GET_TASKID(pTaskInfo), &pTaskInfo->storageAPI);
&pInfo->twAggSup, GET_TASKID(pTaskInfo), &pTaskInfo->storageAPI, pInfo->primaryTsIndex);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
pInfo->primaryTsIndex = tsSlotId;
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
pInfo->pSeDeleted = tSimpleHashInit(64, hashFn);
pInfo->pDelIterator = NULL;

View File

@ -1556,7 +1556,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
qInfo("copy state %p to %p", pTaskInfo->streamInfo.pState, pInfo->pState);
pAPI->stateStore.streamStateSetNumber(pInfo->pState, -1);
pAPI->stateStore.streamStateSetNumber(pInfo->pState, -1, pInfo->primaryTsIndex);
code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str,
pInfo->pState, &pTaskInfo->storageAPI.functionStore);
if (code != TSDB_CODE_SUCCESS) {
@ -1742,7 +1742,7 @@ static TSKEY sesionTs(void* pKey) {
int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SExprSupp* pExpSup, int32_t numOfOutput, int64_t gap,
SStreamState* pState, int32_t keySize, int16_t keyType, SStateStore* pStore,
SReadHandle* pHandle, STimeWindowAggSupp* pTwAggSup, const char* taskIdStr,
SStorageAPI* pApi) {
SStorageAPI* pApi, int32_t tsIndex) {
pSup->resultRowSize = keySize + getResultRowSize(pExpSup->pCtx, numOfOutput);
pSup->pScanBlock = createSpecialDataBlock(STREAM_CLEAR);
pSup->gap = gap;
@ -1758,7 +1758,7 @@ int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SExprSupp* pExpSup, in
initDummyFunction(pSup->pDummyCtx, pExpSup->pCtx, numOfOutput);
pSup->pState = taosMemoryCalloc(1, sizeof(SStreamState));
*(pSup->pState) = *pState;
pSup->stateStore.streamStateSetNumber(pSup->pState, -1);
pSup->stateStore.streamStateSetNumber(pSup->pState, -1, tsIndex);
int32_t funResSize = getMaxFunResSize(pExpSup, numOfOutput);
pSup->pState->pFileState = pSup->stateStore.streamFileStateInit(
tsStreamBufferSize, sizeof(SSessionKey), pSup->resultRowSize, funResSize, sesionTs, pSup->pState,
@ -1767,25 +1767,8 @@ int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SExprSupp* pExpSup, in
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
pSup->pResultRows = tSimpleHashInit(32, hashFn);
int32_t pageSize = 4096;
while (pageSize < pSup->resultRowSize * 4) {
pageSize <<= 1u;
}
// at least four pages need to be in buffer
int32_t bufSize = 4096 * 256;
if (bufSize <= pageSize) {
bufSize = pageSize * 4;
}
if (!osTempSpaceAvailable()) {
terrno = TSDB_CODE_NO_DISKSPACE;
qError("Init stream agg supporter failed since %s, tempDir:%s", terrstr(), tsTempDir);
return terrno;
}
int32_t code = createDiskbasedBuf(&pSup->pResultBuf, pageSize, bufSize, "function", tsTempDir);
for (int32_t i = 0; i < numOfOutput; ++i) {
pExpSup->pCtx[i].saveHandle.pBuf = pSup->pResultBuf;
pExpSup->pCtx[i].saveHandle.pState = pSup->pState;
}
pSup->pSessionAPI = pApi;
@ -3008,16 +2991,16 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh
.deleteMark = getDeleteMark(&pSessionNode->window, 0),
};
pInfo->primaryTsIndex = ((SColumnNode*)pSessionNode->window.pTspk)->slotId;
code = initStreamAggSupporter(&pInfo->streamAggSup, pExpSup, numOfCols, pSessionNode->gap,
pTaskInfo->streamInfo.pState, 0, 0, &pTaskInfo->storageAPI.stateStore, pHandle,
&pInfo->twAggSup, GET_TASKID(pTaskInfo), &pTaskInfo->storageAPI);
&pInfo->twAggSup, GET_TASKID(pTaskInfo), &pTaskInfo->storageAPI, pInfo->primaryTsIndex);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
pInfo->primaryTsIndex = ((SColumnNode*)pSessionNode->window.pTspk)->slotId;
if (pSessionNode->window.pTsEnd) {
pInfo->endTsIndex = ((SColumnNode*)pSessionNode->window.pTsEnd)->slotId;
}
@ -3240,7 +3223,7 @@ SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream
}
SStreamSessionAggOperatorInfo* pChInfo = pChildOp->info;
pChInfo->twAggSup.calTrigger = STREAM_TRIGGER_AT_ONCE;
pAPI->stateStore.streamStateSetNumber(pChInfo->streamAggSup.pState, i);
pAPI->stateStore.streamStateSetNumber(pChInfo->streamAggSup.pState, i, pInfo->primaryTsIndex);
taosArrayPush(pInfo->pChildren, &pChildOp);
}
}
@ -3917,14 +3900,13 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys
}
int32_t keySize = sizeof(SStateKeys) + pColNode->node.resType.bytes;
int16_t type = pColNode->node.resType.type;
pInfo->primaryTsIndex = tsSlotId;
code = initStreamAggSupporter(&pInfo->streamAggSup, pExpSup, numOfCols, 0, pTaskInfo->streamInfo.pState, keySize,
type, &pTaskInfo->storageAPI.stateStore, pHandle, &pInfo->twAggSup,
GET_TASKID(pTaskInfo), &pTaskInfo->storageAPI);
GET_TASKID(pTaskInfo), &pTaskInfo->storageAPI, pInfo->primaryTsIndex);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
pInfo->primaryTsIndex = tsSlotId;
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
pInfo->pSeDeleted = tSimpleHashInit(64, hashFn);
pInfo->pDelIterator = NULL;
@ -4161,7 +4143,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys
pInfo->pState = taosMemoryCalloc(1, sizeof(SStreamState));
*(pInfo->pState) = *(pTaskInfo->streamInfo.pState);
pAPI->stateStore.streamStateSetNumber(pInfo->pState, -1);
pAPI->stateStore.streamStateSetNumber(pInfo->pState, -1, pInfo->primaryTsIndex);
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
code = initAggSup(pSup, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str, pInfo->pState,

View File

@ -1339,23 +1339,24 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator
// The gap is less than the threshold, so it belongs to current session window that has been opened already.
doKeepTuple(pRowSup, tsList[j], gid);
} else { // start a new session window
SResultRow* pResult = NULL;
if (pRowSup->numOfRows > 0) { // handled data that belongs to the previous session window
SResultRow* pResult = NULL;
// keep the time window for the closed time window.
STimeWindow window = pRowSup->win;
// keep the time window for the closed time window.
STimeWindow window = pRowSup->win;
int32_t ret =
setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &window, masterScan, &pResult, gid, pSup->pCtx,
numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_APP_ERROR);
}
pRowSup->win.ekey = pRowSup->win.skey;
int32_t ret = setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &window, masterScan, &pResult, gid, pSup->pCtx,
numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_APP_ERROR);
// pInfo->numOfRows data belong to the current session window
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &window, 0);
applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex,
pRowSup->numOfRows, pBlock->info.rows, numOfOutput);
}
// pInfo->numOfRows data belong to the current session window
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &window, 0);
applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex,
pRowSup->numOfRows, pBlock->info.rows, numOfOutput);
// here we start a new session window
doKeepNewWindowStartInfo(pRowSup, tsList, j, gid);
doKeepTuple(pRowSup, tsList[j], gid);

View File

@ -2657,8 +2657,7 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) {
int32_t blockDataOrder = (startKey <= endKey) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC;
// please ref. to the comment in lastRowFunction for the reason why disabling the opt version of last/first
// function.
// please ref. to the comment in lastRowFunction for the reason why disabling the opt version of last/first function.
#if 0
if (blockDataOrder == TSDB_ORDER_ASC) {
for (int32_t i = pInput->numOfRows + pInput->startRowIndex - 1; i >= pInput->startRowIndex; --i) {
@ -2709,6 +2708,8 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) {
}
}
#else
// todo refactor
if (!pInputCol->hasNull && !pCtx->hasPrimaryKey) {
numOfElems = 1;
@ -2790,7 +2791,6 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) {
}
}
// SET_VAL(pResInfo, numOfElems, 1);
return TSDB_CODE_SUCCESS;
}
@ -3600,10 +3600,7 @@ int32_t saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock*
SWinKey key = {0};
if (pCtx->saveHandle.pBuf == NULL) {
SColumnInfoData* pColInfo = pCtx->input.pPTS;
if (!pColInfo || pColInfo->info.type != TSDB_DATA_TYPE_TIMESTAMP) {
pColInfo = taosArrayGet(pSrcBlock->pDataBlock, 0);
}
SColumnInfoData* pColInfo = taosArrayGet(pSrcBlock->pDataBlock, pCtx->saveHandle.pState->tsIndex);
ASSERT(pColInfo->info.type == TSDB_DATA_TYPE_TIMESTAMP);
key.groupId = pSrcBlock->info.id.groupId;
key.ts = *(int64_t*)colDataGetData(pColInfo, rowIndex);;

View File

@ -1657,9 +1657,6 @@ static int32_t parseValueToken(SInsertParseContext* pCxt, const char** pSql, STo
if (TSDB_DATA_TYPE_TIMESTAMP == pSchema->type && PRIMARYKEY_TIMESTAMP_COL_ID == pSchema->colId) {
return buildSyntaxErrMsg(&pCxt->msg, "Primary timestamp column should not be null", pToken->z);
}
if (pSchema->flags & COL_IS_KEY) {
return buildSyntaxErrMsg(&pCxt->msg, "Primary key column should not be null", pToken->z);
}
pVal->flag = CV_FLAG_NULL;
return TSDB_CODE_SUCCESS;

View File

@ -267,11 +267,6 @@ int32_t qBindStmtColsValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBuf, in
pBind = bind + c;
}
if(pBind->is_null && (pColSchema->flags & COL_IS_KEY)){
code = buildInvalidOperationMsg(&pBuf, "Primary key column should not be null");
goto _return;
}
code = tColDataAddValueByBind(pCol, pBind, IS_VAR_DATA_TYPE(pColSchema->type) ? pColSchema->bytes - VARSTR_HEADER_SIZE: -1);
if (code) {
goto _return;
@ -318,11 +313,6 @@ int32_t qBindStmtSingleColValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBu
pBind = bind;
}
if (pBind->is_null && (pColSchema->flags & COL_IS_KEY)) {
code = buildInvalidOperationMsg(&pBuf, "Primary key column should not be null");
goto _return;
}
tColDataAddValueByBind(pCol, pBind, IS_VAR_DATA_TYPE(pColSchema->type) ? pColSchema->bytes - VARSTR_HEADER_SIZE : -1);
qDebug("stmt col %d bind %d rows data", colIdx, rowNum);

View File

@ -8221,7 +8221,7 @@ static int32_t adjustOrderOfProjections(STranslateContext* pCxt, SNodeList** ppC
if (TSDB_CODE_SUCCESS == code && !hasPrimaryKey && hasPkInTable(pMeta)) {
code = generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_COLUMNS_NUM,
"Primary key column of dest table can not be null");
"Primary key column name must be defined in existed-stable field");
}
SNodeList* pNewProjections = NULL;

View File

@ -304,7 +304,6 @@ void streamMetaRemoveDB(void* arg, char* key) {
SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId, int64_t stage,
startComplete_fn_t fn) {
int32_t code = -1;
SStreamMeta* pMeta = taosMemoryCalloc(1, sizeof(SStreamMeta));
if (pMeta == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
@ -516,7 +515,6 @@ void streamMetaCloseImpl(void* arg) {
taosHashCleanup(pMeta->pTasksMap);
taosHashCleanup(pMeta->pTaskDbUnique);
taosHashCleanup(pMeta->pUpdateTaskSet);
taosHashCleanup(pMeta->updateInfo.pTasks);
taosHashCleanup(pMeta->startInfo.pReadyTaskSet);
taosHashCleanup(pMeta->startInfo.pFailedTaskSet);

View File

@ -202,6 +202,13 @@ _end:
return code;
}
int32_t getSessionRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, void** pVal, int32_t* pVLen) {
SWinKey* pTmpkey = pKey;
ASSERT(keyLen == sizeof(SWinKey));
SSessionKey pWinKey = {.groupId = pTmpkey->groupId, .win.skey = pTmpkey->ts, .win.ekey = pTmpkey->ts};
return getSessionWinResultBuff(pFileState, &pWinKey, 0, pVal, pVLen);
}
int32_t putSessionWinResultBuff(SStreamFileState* pFileState, SRowBuffPos* pPos) {
SSHashObj* pSessionBuff = getRowStateBuff(pFileState);
SSessionKey* pKey = pPos->pKey;

View File

@ -398,8 +398,7 @@ void doProcessDownstreamReadyRsp(SStreamTask* pTask) {
if (pTask->status.taskStatus == TASK_STATUS__HALT) {
ASSERT(HAS_RELATED_FILLHISTORY_TASK(pTask) && (pTask->info.fillHistory == 0));
// halt it self for count window stream task until the related
// fill history task completd.
// halt it self for count window stream task until the related fill history task completed.
stDebug("s-task:%s level:%d initial status is %s from mnode, set it to be halt", pTask->id.idStr,
pTask->info.taskLevel, streamTaskGetStatusStr(pTask->status.taskStatus));
streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_HALT);

View File

@ -277,10 +277,10 @@ int32_t streamStateCommit(SStreamState* pState) {
int32_t streamStateFuncPut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen) {
#ifdef USE_ROCKSDB
void* pVal = NULL;
int32_t len = 0;
getRowBuff(pState->pFileState, (void*)key, sizeof(SWinKey), &pVal, &len);
int32_t len = getRowStateRowSize(pState->pFileState);
int32_t code = getFunctionRowBuff(pState->pFileState, (void*)key, sizeof(SWinKey), &pVal, &len);
char* buf = ((SRowBuffPos*)pVal)->pRowBuff;
uint32_t rowSize = streamFileStateGeSelectRowSize(pState->pFileState);
uint32_t rowSize = streamFileStateGetSelectRowSize(pState->pFileState);
memcpy(buf + len - rowSize, value, vLen);
return TSDB_CODE_SUCCESS;
#else
@ -290,11 +290,12 @@ int32_t streamStateFuncPut(SStreamState* pState, const SWinKey* key, const void*
int32_t streamStateFuncGet(SStreamState* pState, const SWinKey* key, void** ppVal, int32_t* pVLen) {
#ifdef USE_ROCKSDB
void* pVal = NULL;
int32_t len = 0;
int32_t code = getRowBuff(pState->pFileState, (void*)key, sizeof(SWinKey), (void**)(&pVal), &len);
int32_t len = getRowStateRowSize(pState->pFileState);
int32_t code = getFunctionRowBuff(pState->pFileState, (void*)key, sizeof(SWinKey), (void**)(&pVal), &len);
char* buf = ((SRowBuffPos*)pVal)->pRowBuff;
uint32_t rowSize = streamFileStateGeSelectRowSize(pState->pFileState);
uint32_t rowSize = streamFileStateGetSelectRowSize(pState->pFileState);
*ppVal = buf + len - rowSize;
streamStateReleaseBuf(pState, pVal, false);
return code;
#else
return tdbTbGet(pState->pTdbState->pFuncStateDb, key, sizeof(STupleKey), ppVal, pVLen);
@ -332,7 +333,7 @@ bool streamStateCheck(SStreamState* pState, const SWinKey* key) {
int32_t streamStateGetByPos(SStreamState* pState, void* pos, void** pVal) {
int32_t code = getRowBuffByPos(pState->pFileState, pos, pVal);
streamFileStateReleaseBuff(pState->pFileState, pos, false);
streamStateReleaseBuf(pState, pos, false);
return code;
}
@ -395,7 +396,10 @@ int32_t streamStateClear(SStreamState* pState) {
#endif
}
void streamStateSetNumber(SStreamState* pState, int32_t number) { pState->number = number; }
void streamStateSetNumber(SStreamState* pState, int32_t number, int32_t tsIdex) {
pState->number = number;
pState->tsIndex = tsIdex;
}
int32_t streamStateSaveInfo(SStreamState* pState, void* pKey, int32_t keyLen, void* pVal, int32_t vLen) {
#ifdef USE_ROCKSDB

View File

@ -35,7 +35,7 @@ static int32_t doUpdateTaskEpset(SStreamTask* pTask, int32_t nodeId, SEpSet* pEp
if (pTask->info.nodeId == nodeId) { // execution task should be moved away
epsetAssign(&pTask->info.epSet, pEpSet);
EPSET_TO_STR(pEpSet, buf)
epsetToStr(pEpSet, buf, tListLen(buf));
stDebug("s-task:0x%x (vgId:%d) self node epset is updated %s", pTask->id.taskId, nodeId, buf);
}
@ -592,7 +592,7 @@ int32_t streamTaskSetUpstreamInfo(SStreamTask* pTask, const SStreamTask* pUpstre
void streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet) {
char buf[512] = {0};
EPSET_TO_STR(pEpSet, buf);
epsetToStr(pEpSet, buf, tListLen(buf));
int32_t numOfUpstream = taosArrayGetSize(pTask->upstreamInfo.pList);
for (int32_t i = 0; i < numOfUpstream; ++i) {
@ -626,7 +626,7 @@ void streamTaskSetFixedDownstreamInfo(SStreamTask* pTask, const SStreamTask* pDo
void streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet) {
char buf[512] = {0};
EPSET_TO_STR(pEpSet, buf);
epsetToStr(pEpSet, buf, tListLen(buf));
int32_t id = pTask->id.taskId;
int8_t type = pTask->outputInfo.type;
@ -733,15 +733,12 @@ bool streamTaskIsAllUpstreamClosed(SStreamTask* pTask) {
bool streamTaskSetSchedStatusWait(SStreamTask* pTask) {
bool ret = false;
// double check
taosThreadMutexLock(&pTask->lock);
if (pTask->status.schedStatus == TASK_SCHED_STATUS__INACTIVE) {
taosThreadMutexLock(&pTask->lock);
if (pTask->status.schedStatus == TASK_SCHED_STATUS__INACTIVE) {
pTask->status.schedStatus = TASK_SCHED_STATUS__WAITING;
ret = true;
}
taosThreadMutexUnlock(&pTask->lock);
pTask->status.schedStatus = TASK_SCHED_STATUS__WAITING;
ret = true;
}
taosThreadMutexUnlock(&pTask->lock);
return ret;
}

View File

@ -58,6 +58,8 @@ struct SStreamFileState {
_state_file_remove_fn stateFileRemoveFn;
_state_file_get_fn stateFileGetFn;
_state_file_clear_fn stateFileClearFn;
_state_fun_get_fn stateFunctionGetFn;
};
typedef SRowBuffPos SRowBuffInfo;
@ -157,6 +159,7 @@ SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_
pFileState->stateFileGetFn = intervalFileGetFn;
pFileState->stateFileClearFn = streamStateClear_rocksdb;
pFileState->cfName = taosStrdup("state");
pFileState->stateFunctionGetFn = getRowBuff;
} else {
pFileState->rowStateBuff = tSimpleHashInit(cap, hashFn);
pFileState->stateBuffCleanupFn = sessionWinStateCleanup;
@ -168,6 +171,7 @@ SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_
pFileState->stateFileGetFn = sessionFileGetFn;
pFileState->stateFileClearFn = streamStateSessionClear_rocksdb;
pFileState->cfName = taosStrdup("sess");
pFileState->stateFunctionGetFn = getSessionRowBuff;
}
if (!pFileState->usedBuffs || !pFileState->freeBuffs || !pFileState->rowStateBuff) {
@ -738,7 +742,7 @@ int32_t recoverSnapshot(SStreamFileState* pFileState, int64_t ckId) {
return TSDB_CODE_SUCCESS;
}
int32_t streamFileStateGeSelectRowSize(SStreamFileState* pFileState) { return pFileState->selectivityRowSize; }
int32_t streamFileStateGetSelectRowSize(SStreamFileState* pFileState) { return pFileState->selectivityRowSize; }
void streamFileStateReloadInfo(SStreamFileState* pFileState, TSKEY ts) {
pFileState->flushMark = TMAX(pFileState->flushMark, ts);
@ -756,3 +760,7 @@ bool isDeteled(SStreamFileState* pFileState, TSKEY ts) {
bool isFlushedState(SStreamFileState* pFileState, TSKEY ts, TSKEY gap) { return ts <= (pFileState->flushMark + gap); }
int32_t getRowStateRowSize(SStreamFileState* pFileState) { return pFileState->rowSize; }
int32_t getFunctionRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, void** pVal, int32_t* pVLen) {
return pFileState->stateFunctionGetFn(pFileState, pKey, keyLen, pVal, pVLen);
}

View File

@ -2188,7 +2188,7 @@ static void cliSchedMsgToDebug(SCliMsg* pMsg, char* label) {
STransConnCtx* pCtx = pMsg->ctx;
STraceId* trace = &pMsg->msg.info.traceId;
char tbuf[512] = {0};
EPSET_TO_STR(&pCtx->epSet, tbuf);
epsetToStr(&pCtx->epSet, tbuf, tListLen(tbuf));
tGDebug("%s retry on next node,use:%s, step: %d,timeout:%" PRId64 "", label, tbuf, pCtx->retryStep,
pCtx->retryNextInterval);
return;
@ -2421,7 +2421,7 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
if (hasEpSet) {
if (rpcDebugFlag & DEBUG_TRACE) {
char tbuf[512] = {0};
EPSET_TO_STR(&pCtx->epSet, tbuf);
epsetToStr(&pCtx->epSet, tbuf, tListLen(tbuf));
tGTrace("%s conn %p extract epset from msg", CONN_GET_INST_LABEL(pConn), pConn);
}
}

View File

@ -174,7 +174,9 @@ static int32_t cfgSetBool(SConfigItem *pItem, const char *value, ECfgSrcType sty
}
static int32_t cfgSetInt32(SConfigItem *pItem, const char *value, ECfgSrcType stype) {
int32_t ival = taosStrHumanToInt32(value);
int32_t ival;
int32_t code = taosStrHumanToInt32(value, &ival);
if (code != TSDB_CODE_SUCCESS) return code;
if (ival < pItem->imin || ival > pItem->imax) {
uError("cfg:%s, type:%s src:%s value:%d out of range[%" PRId64 ", %" PRId64 "]", pItem->name,
cfgDtypeStr(pItem->dtype), cfgStypeStr(stype), ival, pItem->imin, pItem->imax);
@ -188,7 +190,9 @@ static int32_t cfgSetInt32(SConfigItem *pItem, const char *value, ECfgSrcType st
}
static int32_t cfgSetInt64(SConfigItem *pItem, const char *value, ECfgSrcType stype) {
int64_t ival = taosStrHumanToInt64(value);
int64_t ival;
int32_t code = taosStrHumanToInt64(value, &ival);
if (code != TSDB_CODE_SUCCESS) return code;
if (ival < pItem->imin || ival > pItem->imax) {
uError("cfg:%s, type:%s src:%s value:%" PRId64 " out of range[%" PRId64 ", %" PRId64 "]", pItem->name,
cfgDtypeStr(pItem->dtype), cfgStypeStr(stype), ival, pItem->imin, pItem->imax);
@ -202,15 +206,16 @@ static int32_t cfgSetInt64(SConfigItem *pItem, const char *value, ECfgSrcType st
}
static int32_t cfgSetFloat(SConfigItem *pItem, const char *value, ECfgSrcType stype) {
float fval = (float)atof(value);
if (fval < pItem->fmin || fval > pItem->fmax) {
double dval;
int32_t code = parseCfgReal(value, &dval);
if (dval < pItem->fmin || dval > pItem->fmax) {
uError("cfg:%s, type:%s src:%s value:%f out of range[%f, %f]", pItem->name, cfgDtypeStr(pItem->dtype),
cfgStypeStr(stype), fval, pItem->fmin, pItem->fmax);
cfgStypeStr(stype), dval, pItem->fmin, pItem->fmax);
terrno = TSDB_CODE_OUT_OF_RANGE;
return -1;
}
pItem->fval = fval;
pItem->fval = (float)dval;
pItem->stype = stype;
return 0;
}
@ -408,7 +413,9 @@ int32_t cfgCheckRangeForDynUpdate(SConfig *pCfg, const char *name, const char *p
}
} break;
case CFG_DTYPE_INT32: {
int32_t ival = (int32_t)taosStrHumanToInt32(pVal);
int32_t ival;
int32_t code = (int32_t)taosStrHumanToInt32(pVal, &ival);
if (code != TSDB_CODE_SUCCESS) return code;
if (ival < pItem->imin || ival > pItem->imax) {
uError("cfg:%s, type:%s value:%d out of range[%" PRId64 ", %" PRId64 "]", pItem->name,
cfgDtypeStr(pItem->dtype), ival, pItem->imin, pItem->imax);
@ -417,7 +424,9 @@ int32_t cfgCheckRangeForDynUpdate(SConfig *pCfg, const char *name, const char *p
}
} break;
case CFG_DTYPE_INT64: {
int64_t ival = (int64_t)taosStrHumanToInt64(pVal);
int64_t ival;
int32_t code = taosStrHumanToInt64(pVal, &ival);
if (code != TSDB_CODE_SUCCESS) return code;
if (ival < pItem->imin || ival > pItem->imax) {
uError("cfg:%s, type:%s value:%" PRId64 " out of range[%" PRId64 ", %" PRId64 "]", pItem->name,
cfgDtypeStr(pItem->dtype), ival, pItem->imin, pItem->imax);
@ -427,9 +436,11 @@ int32_t cfgCheckRangeForDynUpdate(SConfig *pCfg, const char *name, const char *p
} break;
case CFG_DTYPE_FLOAT:
case CFG_DTYPE_DOUBLE: {
float fval = (float)atof(pVal);
if (fval < pItem->fmin || fval > pItem->fmax) {
uError("cfg:%s, type:%s value:%f out of range[%f, %f]", pItem->name, cfgDtypeStr(pItem->dtype), fval,
double dval;
int32_t code = parseCfgReal(pVal, &dval);
if (code != TSDB_CODE_SUCCESS) return code;
if (dval < pItem->fmin || dval > pItem->fmax) {
uError("cfg:%s, type:%s value:%f out of range[%f, %f]", pItem->name, cfgDtypeStr(pItem->dtype), dval,
pItem->fmin, pItem->fmax);
terrno = TSDB_CODE_OUT_OF_RANGE;
return -1;

View File

@ -629,6 +629,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_PAR_SECOND_COL_PK, "primary key must be
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_COL_PK_TYPE, "primary key column must be of type int, uint, bigint, ubigint, and varchar")
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_PK_OP, "primary key column can not be added, modified, and dropped")
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_PRIMARY_KEY_IS_NULL, "Primary key column should not be null")
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_PRIMARY_KEY_IS_NONE, "Primary key column should not be none")
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INTERNAL_ERROR, "Parser internal error")
//planner

View File

@ -23,45 +23,74 @@
#define UNIT_ONE_PEBIBYTE (UNIT_ONE_TEBIBYTE * UNIT_SIZE_CONVERT_FACTOR)
#define UNIT_ONE_EXBIBYTE (UNIT_ONE_PEBIBYTE * UNIT_SIZE_CONVERT_FACTOR)
int64_t taosStrHumanToInt64(const char* str) {
size_t sLen = strlen(str);
if (sLen < 2) return atoll(str);
int64_t val = 0;
char* strNoUnit = NULL;
char unit = str[sLen - 1];
if ((unit == 'P') || (unit == 'p')) {
strNoUnit = taosMemoryCalloc(sLen, 1);
memcpy(strNoUnit, str, sLen - 1);
val = atof(strNoUnit) * UNIT_ONE_PEBIBYTE;
} else if ((unit == 'T') || (unit == 't')) {
strNoUnit = taosMemoryCalloc(sLen, 1);
memcpy(strNoUnit, str, sLen - 1);
val = atof(strNoUnit) * UNIT_ONE_TEBIBYTE;
} else if ((unit == 'G') || (unit == 'g')) {
strNoUnit = taosMemoryCalloc(sLen, 1);
memcpy(strNoUnit, str, sLen - 1);
val = atof(strNoUnit) * UNIT_ONE_GIBIBYTE;
} else if ((unit == 'M') || (unit == 'm')) {
strNoUnit = taosMemoryCalloc(sLen, 1);
memcpy(strNoUnit, str, sLen - 1);
val = atof(strNoUnit) * UNIT_ONE_MEBIBYTE;
} else if ((unit == 'K') || (unit == 'k')) {
strNoUnit = taosMemoryCalloc(sLen, 1);
memcpy(strNoUnit, str, sLen - 1);
val = atof(strNoUnit) * UNIT_ONE_KIBIBYTE;
} else {
val = atoll(str);
static int32_t parseCfgIntWithUnit(const char* str, double *res) {
double val, temp = INT64_MAX;
char* endPtr;
errno = 0;
val = taosStr2Int64(str, &endPtr, 0);
if (*endPtr == '.' || errno == ERANGE) {
errno = 0;
val = taosStr2Double(str, &endPtr);
}
if (endPtr == str || errno == ERANGE || isnan(val)) {
terrno = TSDB_CODE_INVALID_CFG_VALUE;
return -1;
}
while (isspace((unsigned char)*endPtr)) endPtr++;
uint64_t factor = 1;
if (*endPtr != '\0') {
switch (*endPtr) {
case 'P':
case 'p': {
temp /= UNIT_ONE_PEBIBYTE;
factor = UNIT_ONE_PEBIBYTE;
} break;
case 'T':
case 't': {
temp /= UNIT_ONE_TEBIBYTE;
factor = UNIT_ONE_TEBIBYTE;
} break;
case 'G':
case 'g': {
temp /= UNIT_ONE_GIBIBYTE;
factor = UNIT_ONE_GIBIBYTE;
} break;
case 'M':
case 'm': {
temp /= UNIT_ONE_MEBIBYTE;
factor = UNIT_ONE_MEBIBYTE;
} break;
case 'K':
case 'k': {
temp /= UNIT_ONE_KIBIBYTE;
factor = UNIT_ONE_KIBIBYTE;
} break;
default:
terrno = TSDB_CODE_INVALID_CFG_VALUE;
return -1;
}
if ((val > 0 && val > temp) || (val < 0 && val < -temp)) {
terrno = TSDB_CODE_OUT_OF_RANGE;
return -1;
}
endPtr++;
val *= factor;
}
while (isspace((unsigned char)*endPtr)) endPtr++;
if (*endPtr) {
terrno = TSDB_CODE_INVALID_CFG_VALUE;
return -1;
}
val = rint(val);
*res = val;
return TSDB_CODE_SUCCESS;
}
taosMemoryFree(strNoUnit);
return val;
int32_t taosStrHumanToInt64(const char* str, int64_t *out) {
double res;
int32_t code = parseCfgIntWithUnit(str, &res);
if (code == TSDB_CODE_SUCCESS) *out = (int64_t)res;
return code;
}
#ifdef BUILD_NO_CALL
@ -83,35 +112,17 @@ void taosInt64ToHumanStr(int64_t val, char* outStr) {
}
#endif
int32_t taosStrHumanToInt32(const char* str) {
size_t sLen = strlen(str);
if (sLen < 2) return atoll(str);
int32_t val = 0;
char* strNoUnit = NULL;
char unit = str[sLen - 1];
if ((unit == 'G') || (unit == 'g')) {
strNoUnit = taosMemoryCalloc(sLen, 1);
memcpy(strNoUnit, str, sLen - 1);
val = atof(strNoUnit) * UNIT_ONE_GIBIBYTE;
} else if ((unit == 'M') || (unit == 'm')) {
strNoUnit = taosMemoryCalloc(sLen, 1);
memcpy(strNoUnit, str, sLen - 1);
val = atof(strNoUnit) * UNIT_ONE_MEBIBYTE;
} else if ((unit == 'K') || (unit == 'k')) {
strNoUnit = taosMemoryCalloc(sLen, 1);
memcpy(strNoUnit, str, sLen - 1);
val = atof(strNoUnit) * UNIT_ONE_KIBIBYTE;
} else {
val = atoll(str);
int32_t taosStrHumanToInt32(const char* str, int32_t* out) {
double res;
int32_t code = parseCfgIntWithUnit(str, &res);
if (code == TSDB_CODE_SUCCESS) {
if (res < INT32_MIN || res > INT32_MAX) {
terrno = TSDB_CODE_OUT_OF_RANGE;
return -1;
}
*out = (int32_t)res;
}
taosMemoryFree(strNoUnit);
return val;
return code;
}
#ifdef BUILD_NO_CALL

View File

@ -496,3 +496,21 @@ size_t twcsncspn(const TdUcs4 *wcs, size_t size, const TdUcs4 *reject, size_t rs
return index;
}
int32_t parseCfgReal(const char* str, double* out) {
double val;
char *endPtr;
errno = 0;
val = taosStr2Double(str, &endPtr);
if (str == endPtr || errno == ERANGE || isnan(val)) {
terrno = TSDB_CODE_INVALID_CFG_VALUE;
return -1;
}
while(isspace((unsigned char)*endPtr)) endPtr++;
if (*endPtr != '\0') {
terrno = TSDB_CODE_INVALID_CFG_VALUE;
return -1;
}
*out = val;
return TSDB_CODE_SUCCESS;
}

View File

@ -372,17 +372,22 @@ print step4=============
sql create database test6 vgroups 4;
sql use test6;
sql create stable st(ts timestamp,a int,b int,c int,d double) tags(ta int,tb int,tc int);
sql create stable st(ts timestamp,a int,b int,c int,d int) tags(ta int,tb int,tc int);
sql create table t1 using st tags(1,1,1);
sql create table t2 using st tags(2,2,2);
sql create stream streams6 trigger at_once ignore expired 0 ignore update 0 into streamt6 as select _wstart, b, c,min(c), ta, tb from st interval(1s);
sql create stream streams7 trigger at_once ignore expired 0 ignore update 0 into streamt7 as select ts, max(c) from st interval(1s);
sql create stream streams8 trigger at_once ignore expired 0 ignore update 0 into streamt8 as select ts, b, c, last(c), ta, tb from st session(ts, 1s);
sql create stream streams9 trigger at_once ignore expired 0 ignore update 0 into streamt9 as select ts, b, c, last_row(c), ta, tb from st partition by tbname state_window(a);
sql create stream streams10 trigger at_once ignore expired 0 ignore update 0 into streamt10 as select ts, b, c, last(c), ta, tb from st partition by tbname event_window start with d = 0 end with d = 9;
sql create stream streams11 trigger at_once ignore expired 1 ignore update 0 watermark 100s into streamt11 as select ts, b, c, last(c), ta, tb from st partition by tbname count_window(2);
sleep 1000
sql insert into t1 values(1648791211000,1,2,3,1.0);
sql insert into t1 values(1648791213000,2,3,4,1.1);
sql insert into t2 values(1648791215000,3,4,5,1.1);
sql insert into t2 values(1648791217000,4,5,6,1.1);
sql insert into t1 values(1648791211000,1,2,3,0);
sql insert into t1 values(1648791213000,2,3,4,0);
sql insert into t2 values(1648791215000,3,4,5,0);
sql insert into t2 values(1648791217000,4,5,6,0);
$loop_count = 0

View File

@ -240,6 +240,49 @@ class TDTestCase:
self.show_create_sysdb_sql()
self.show_create_systb_sql()
self.show_column_name()
self.test_show_variables()
def get_variable(self, name: str, local: bool = True):
if local:
sql = 'show local variables'
else:
sql = f'select `value` from information_schema.ins_dnode_variables where name like "{name}"'
tdSql.query(sql, queryTimes=1)
res = tdSql.queryResult
if local:
for row in res:
if row[0] == name:
return row[1]
else:
if len(res) > 0:
return res[0][0]
raise Exception(f"variable {name} not found")
def test_show_variables(self):
epsion = 0.0000001
var = 'minimalTmpDirGB'
expect_val: float = 10.11
sql = f'ALTER LOCAL "{var}" "{expect_val}"'
tdSql.execute(sql)
val: float = float(self.get_variable(var))
if val != expect_val:
tdLog.exit(f'failed to set local {var} to {expect_val} actually {val}')
error_vals = ['a', '10a', '', '1.100r', '1.12 r']
for error_val in error_vals:
tdSql.error(f'ALTER LOCAL "{var}" "{error_val}"')
var = 'supportVnodes'
expect_val = 1240 ## 1.211111 * 1024
sql = f'ALTER DNODE 1 "{var}" "1.211111k"'
tdSql.execute(sql, queryTimes=1)
val = int(self.get_variable(var, False))
if val != expect_val:
tdLog.exit(f'failed to set dnode {var} to {expect_val} actually {val}')
error_vals = ['a', '10a', '', '1.100r', '1.12 r', '5k']
for error_val in error_vals:
tdSql.error(f'ALTER DNODE 1 "{var}" "{error_val}"')
def stop(self):
tdSql.close()