Merge branch '3.0' into FIX/TD-19593-3.0

This commit is contained in:
Benguang Zhao 2022-10-19 09:38:36 +08:00
commit 00a066443f
52 changed files with 1283 additions and 1325 deletions

View File

@ -2,7 +2,7 @@
# taos-tools
ExternalProject_Add(taos-tools
GIT_REPOSITORY https://github.com/taosdata/taos-tools.git
GIT_TAG c64858f
GIT_TAG 9284147
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools"
BINARY_DIR ""
#BUILD_IN_SOURCE TRUE

View File

@ -945,7 +945,7 @@ MIN(expr)
MODE(expr)
```
**Description**:The value which has the highest frequency of occurrence. NULL is returned if there are multiple values which have highest frequency of occurrence.
**Description**:The value which has the highest frequency of occurrence. One random value is returned if there are multiple values which have highest frequency of occurrence.
**Return value type**: Same as the input data

View File

@ -946,7 +946,7 @@ MIN(expr)
MODE(expr)
```
**功能说明**:返回出现频率最高的值,若存在多个频率相同的最高值,输出NULL
**功能说明**:返回出现频率最高的值,若存在多个频率相同的最高值,则随机输出其中某个值
**返回数据类型**:与输入数据类型一致。

View File

@ -44,12 +44,17 @@ enum {
)
// clang-format on
typedef struct {
typedef struct SWinKey {
uint64_t groupId;
TSKEY ts;
} SWinKey;
static inline int sWinKeyCmprImpl(const void* pKey1, const void* pKey2) {
typedef struct SSessionKey {
STimeWindow win;
uint64_t groupId;
} SSessionKey;
static inline int winKeyCmprImpl(const void* pKey1, const void* pKey2) {
SWinKey* pWin1 = (SWinKey*)pKey1;
SWinKey* pWin2 = (SWinKey*)pKey2;
@ -69,7 +74,7 @@ static inline int sWinKeyCmprImpl(const void* pKey1, const void* pKey2) {
}
static inline int winKeyCmpr(const void* pKey1, int kLen1, const void* pKey2, int kLen2) {
return sWinKeyCmprImpl(pKey1, pKey2);
return winKeyCmprImpl(pKey1, pKey2);
}
typedef struct {

View File

@ -25,6 +25,8 @@ extern "C" {
typedef struct SStreamTask SStreamTask;
typedef bool (*state_key_cmpr_fn)(void* pKey1, void* pKey2);
// incremental state storage
typedef struct {
SStreamTask* pOwner;
@ -32,6 +34,7 @@ typedef struct {
TTB* pStateDb;
TTB* pFuncStateDb;
TTB* pFillStateDb; // todo refactor
TTB* pSessionStateDb;
TXN txn;
int32_t number;
} SStreamState;
@ -57,6 +60,19 @@ int32_t streamStateDel(SStreamState* pState, const SWinKey* key);
int32_t streamStateClear(SStreamState* pState);
void streamStateSetNumber(SStreamState* pState, int32_t number);
int32_t streamStateSessionAddIfNotExist(SStreamState* pState, SSessionKey* key, void** pVal, int32_t* pVLen);
int32_t streamStateSessionPut(SStreamState* pState, const SSessionKey* key, const void* value, int32_t vLen);
int32_t streamStateSessionGet(SStreamState* pState, SSessionKey* key, void** pVal, int32_t* pVLen);
int32_t streamStateSessionDel(SStreamState* pState, const SSessionKey* key);
int32_t streamStateSessionClear(SStreamState* pState);
int32_t streamStateSessionGetKVByCur(SStreamStateCur* pCur, SSessionKey* pKey, const void** pVal, int32_t* pVLen);
int32_t streamStateStateAddIfNotExist(SStreamState* pState, SSessionKey* key, char* pKeyData, int32_t keyDataLen,
state_key_cmpr_fn fn, void** pVal, int32_t* pVLen);
SStreamStateCur* streamStateSessionSeekKeyNext(SStreamState* pState, const SSessionKey* key);
SStreamStateCur* streamStateSessionSeekKeyPrev(SStreamState* pState, const SSessionKey* key);
SStreamStateCur* streamStateSessionGetCur(SStreamState* pState, const SSessionKey* key);
int32_t streamStateFillPut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen);
int32_t streamStateFillGet(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen);
int32_t streamStateFillDel(SStreamState* pState, const SWinKey* key);

View File

@ -498,6 +498,7 @@ enum {
#define MAX_NUM_STR_SIZE 40
#define MAX_META_MSG_IN_BATCH 1048576
#define MAX_META_BATCH_RSP_SIZE (1 * 1048576 * 1024)
#ifdef __cplusplus
}

View File

@ -955,7 +955,12 @@ SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, bool keepQue
switch (pQuery->execMode) {
case QUERY_EXEC_MODE_LOCAL:
if (!pRequest->validateOnly) {
code = execLocalCmd(pRequest, pQuery);
if (NULL == pQuery->pRoot) {
terrno = TSDB_CODE_INVALID_PARA;
code = terrno;
} else {
code = execLocalCmd(pRequest, pQuery);
}
}
break;
case QUERY_EXEC_MODE_RPC:
@ -997,7 +1002,7 @@ SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, bool keepQue
handleQueryExecRsp(pRequest);
if (NULL != pRequest && TSDB_CODE_SUCCESS != code) {
if (TSDB_CODE_SUCCESS != code) {
pRequest->code = terrno;
}
@ -2254,7 +2259,10 @@ void syncQueryFn(void* param, void* res, int32_t code) {
void taosAsyncQueryImpl(uint64_t connId, const char* sql, __taos_async_fn_t fp, void* param, bool validateOnly) {
if (sql == NULL || NULL == fp) {
terrno = TSDB_CODE_INVALID_PARA;
fp(param, NULL, terrno);
if (fp) {
fp(param, NULL, terrno);
}
return;
}

View File

@ -944,7 +944,6 @@ void taos_fetch_rows_a(TAOS_RES *res, __taos_async_fn_t fp, void *param) {
if (pResultInfo->completed) {
// it is a local executed query, no need to do async fetch
if (QUERY_EXEC_MODE_LOCAL == pRequest->body.execMode) {
ASSERT(pResultInfo->numOfRows >= 0);
if (pResultInfo->localResultFetched) {
pResultInfo->numOfRows = 0;
pResultInfo->current = 0;

View File

@ -292,8 +292,10 @@ int32_t processDropDbRsp(void* param, SDataBuf* pMsg, int32_t code) {
tDeserializeSDropDbRsp(pMsg->pData, pMsg->len, &dropdbRsp);
struct SCatalog* pCatalog = NULL;
catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
catalogRemoveDB(pCatalog, dropdbRsp.db, dropdbRsp.uid);
int32_t code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
if (TSDB_CODE_SUCCESS == code) {
catalogRemoveDB(pCatalog, dropdbRsp.db, dropdbRsp.uid);
}
}
taosMemoryFree(pMsg->pData);
@ -397,6 +399,7 @@ static int32_t buildShowVariablesRsp(SArray* pVars, SRetrieveTableRsp** pRsp) {
size_t rspSize = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock);
*pRsp = taosMemoryCalloc(1, rspSize);
if (NULL == *pRsp) {
blockDataDestroy(pBlock);
return TSDB_CODE_OUT_OF_MEMORY;
}

View File

@ -1372,8 +1372,14 @@ static int32_t smlKvTimeArrayCompare(const void *key1, const void *key2) {
static int32_t smlKvTimeHashCompare(const void *key1, const void *key2) {
SHashObj *s1 = *(SHashObj **)key1;
SHashObj *s2 = *(SHashObj **)key2;
SSmlKv *kv1 = *(SSmlKv **)taosHashGet(s1, TS, TS_LEN);
SSmlKv *kv2 = *(SSmlKv **)taosHashGet(s2, TS, TS_LEN);
SSmlKv **kv1pp = (SSmlKv **)taosHashGet(s1, TS, TS_LEN);
SSmlKv **kv2pp = (SSmlKv **)taosHashGet(s2, TS, TS_LEN);
if(!kv1pp || !kv2pp){
uError("smlKvTimeHashCompare kv is null");
return -1;
}
SSmlKv *kv1 = *kv1pp;
SSmlKv *kv2 = *kv2pp;
if(!kv1 || kv1->type != TSDB_DATA_TYPE_TIMESTAMP){
uError("smlKvTimeHashCompare kv1");
return -1;

View File

@ -152,7 +152,7 @@ int32_t stmtUpdateBindInfo(TAOS_STMT* stmt, STableMeta* pTableMeta, void* tags,
pStmt->bInfo.tbType = pTableMeta->tableType;
pStmt->bInfo.boundTags = tags;
pStmt->bInfo.tagsCached = false;
strcpy(pStmt->bInfo.stbFName, sTableName);
tstrncpy(pStmt->bInfo.stbFName, sTableName, sizeof(pStmt->bInfo.stbFName));
return TSDB_CODE_SUCCESS;
}

View File

@ -210,6 +210,8 @@ typedef struct {
typedef struct {
SMqCommitCbParamSet* params;
STqOffset* pOffset;
/*char topicName[TSDB_TOPIC_FNAME_LEN];*/
/*int32_t vgId;*/
} SMqCommitCbParam;
tmq_conf_t* tmq_conf_new() {
@ -407,6 +409,14 @@ int32_t tmqCommitDone(SMqCommitCbParamSet* pParamSet) {
return 0;
}
static void tmqCommitRspCountDown(SMqCommitCbParamSet* pParamSet) {
int32_t waitingRspNum = atomic_sub_fetch_32(&pParamSet->waitingRspNum, 1);
ASSERT(waitingRspNum >= 0);
if (waitingRspNum == 0) {
tmqCommitDone(pParamSet);
}
}
int32_t tmqCommitCb(void* param, SDataBuf* pBuf, int32_t code) {
SMqCommitCbParam* pParam = (SMqCommitCbParam*)param;
SMqCommitCbParamSet* pParamSet = (SMqCommitCbParamSet*)pParam->params;
@ -420,18 +430,13 @@ int32_t tmqCommitCb(void* param, SDataBuf* pBuf, int32_t code) {
#endif
taosMemoryFree(pParam->pOffset);
if (pBuf->pData) taosMemoryFree(pBuf->pData);
taosMemoryFree(pBuf->pData);
/*tscDebug("receive offset commit cb of %s on vgId:%d, offset is %" PRId64, pParam->pOffset->subKey, pParam->->vgId,
* pOffset->version);*/
// count down waiting rsp
int32_t waitingRspNum = atomic_sub_fetch_32(&pParamSet->waitingRspNum, 1);
ASSERT(waitingRspNum >= 0);
tmqCommitRspCountDown(pParamSet);
if (waitingRspNum == 0) {
tmqCommitDone(pParamSet);
}
return 0;
}
@ -591,14 +596,10 @@ FAIL:
return 0;
}
int32_t tmqCommitInner(tmq_t* tmq, const TAOS_RES* msg, int8_t automatic, int8_t async, tmq_commit_cb* userCb,
void* userParam) {
static int32_t tmqCommitConsumerImpl(tmq_t* tmq, int8_t automatic, int8_t async, tmq_commit_cb* userCb,
void* userParam) {
int32_t code = -1;
if (msg != NULL) {
return tmqCommitMsgImpl(tmq, msg, async, userCb, userParam);
}
SMqCommitCbParamSet* pParamSet = taosMemoryCalloc(1, sizeof(SMqCommitCbParamSet));
if (pParamSet == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
@ -646,33 +647,37 @@ int32_t tmqCommitInner(tmq_t* tmq, const TAOS_RES* msg, int8_t automatic, int8_t
}
}
// no request is sent
if (pParamSet->totalRspNum == 0) {
tsem_destroy(&pParamSet->rspSem);
taosMemoryFree(pParamSet);
return 0;
}
int32_t waitingRspNum = atomic_sub_fetch_32(&pParamSet->waitingRspNum, 1);
ASSERT(waitingRspNum >= 0);
if (waitingRspNum == 0) {
tmqCommitDone(pParamSet);
}
// count down since waiting rsp num init as 1
tmqCommitRspCountDown(pParamSet);
if (!async) {
tsem_wait(&pParamSet->rspSem);
code = pParamSet->rspErr;
tsem_destroy(&pParamSet->rspSem);
taosMemoryFree(pParamSet);
}
#if 0
if (!async) {
taosArrayDestroyP(pParamSet->successfulOffsets, taosMemoryFree);
taosArrayDestroyP(pParamSet->failedOffsets, taosMemoryFree);
}
#endif
}
return 0;
return code;
}
int32_t tmqCommitInner(tmq_t* tmq, const TAOS_RES* msg, int8_t automatic, int8_t async, tmq_commit_cb* userCb,
void* userParam) {
if (msg) {
return tmqCommitMsgImpl(tmq, msg, async, userCb, userParam);
} else {
return tmqCommitConsumerImpl(tmq, automatic, async, userCb, userParam);
}
}
void tmqAssignAskEpTask(void* param, void* tmrId) {

View File

@ -1892,12 +1892,13 @@ char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf)
for (int32_t k = 0; k < colNum; k++) {
SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, k);
void* var = POINTER_SHIFT(pColInfoData->pData, j * pColInfoData->info.bytes);
if (colDataIsNull(pColInfoData, rows, j, NULL) || !pColInfoData->pData) {
len += snprintf(dumpBuf + len, size - len, " %15s |", "NULL");
if (len >= size - 1) return dumpBuf;
continue;
}
void* var = colDataGetData(pColInfoData, j);
switch (pColInfoData->info.type) {
case TSDB_DATA_TYPE_TIMESTAMP:
memset(pBuf, 0, sizeof(pBuf));
@ -1926,8 +1927,8 @@ char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf)
if (len >= size - 1) return dumpBuf;
break;
case TSDB_DATA_TYPE_DOUBLE:
len += snprintf(dumpBuf + len, size - len, " %15lf |", *(double*)var);
if (len >= size - 1) return dumpBuf;
// len += snprintf(dumpBuf + len, size - len, " %15lf |", *(double*)var);
// if (len >= size - 1) return dumpBuf;
break;
case TSDB_DATA_TYPE_BOOL:
len += snprintf(dumpBuf + len, size - len, " %15d |", *(bool*)var);

View File

@ -745,6 +745,7 @@ static void mndReloadSyncConfig(SMnode *pMnode) {
mInfo("vgId:1, mnode sync not reconfig since readyMnodes:%d updatingMnodes:%d", readyMnodes, updatingMnodes);
return;
}
// ASSERT(0);
if (cfg.myIndex == -1) {
#if 1

View File

@ -90,14 +90,39 @@ int32_t mndProcessBatchMetaMsg(SRpcMsg *pMsg) {
}
for (int32_t i = 0; i < msgNum; ++i) {
if (offset >= pMsg->contLen) {
mError("offset %d is bigger than contLen %d", offset, pMsg->contLen);
terrno = TSDB_CODE_MSG_NOT_PROCESSED;
taosArrayDestroy(batchRsp);
return -1;
}
req.msgIdx = ntohl(*(int32_t *)((char *)pMsg->pCont + offset));
offset += sizeof(req.msgIdx);
if (offset >= pMsg->contLen) {
mError("offset %d is bigger than contLen %d", offset, pMsg->contLen);
terrno = TSDB_CODE_MSG_NOT_PROCESSED;
taosArrayDestroy(batchRsp);
return -1;
}
req.msgType = ntohl(*(int32_t *)((char *)pMsg->pCont + offset));
offset += sizeof(req.msgType);
if (offset >= pMsg->contLen) {
mError("offset %d is bigger than contLen %d", offset, pMsg->contLen);
terrno = TSDB_CODE_MSG_NOT_PROCESSED;
taosArrayDestroy(batchRsp);
return -1;
}
req.msgLen = ntohl(*(int32_t *)((char *)pMsg->pCont + offset));
offset += sizeof(req.msgLen);
if (offset >= pMsg->contLen) {
mError("offset %d is bigger than contLen %d", offset, pMsg->contLen);
terrno = TSDB_CODE_MSG_NOT_PROCESSED;
taosArrayDestroy(batchRsp);
return -1;
}
req.msg = (char *)pMsg->pCont + offset;
offset += req.msgLen;

View File

@ -2553,12 +2553,17 @@ static int32_t mndRetrieveStb(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBloc
char rollup[160 + VARSTR_HEADER_SIZE] = {0};
int32_t rollupNum = (int32_t)taosArrayGetSize(pStb->pFuncs);
char *sep = ", ";
int32_t sepLen = strlen(sep);
int32_t rollupLen = sizeof(rollup) - 2;
for (int32_t i = 0; i < rollupNum; ++i) {
char *funcName = taosArrayGet(pStb->pFuncs, i);
if (i) {
strcat(varDataVal(rollup), ", ");
strncat(varDataVal(rollup), sep, rollupLen);
rollupLen -= sepLen;
}
strcat(varDataVal(rollup), funcName);
strncat(varDataVal(rollup), funcName, rollupLen);
rollupLen -= strlen(funcName);
}
varDataSetLen(rollup, strlen(varDataVal(rollup)));

View File

@ -293,6 +293,14 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) {
if (code == 0) {
tsem_wait(&pMgmt->syncSem);
} else if (code > 0) {
mInfo("trans:%d, confirm at once since replica is 1, continue execute", transId);
taosWLockLatch(&pMgmt->lock);
pMgmt->transId = 0;
taosWUnLockLatch(&pMgmt->lock);
sdbWriteWithoutFree(pMnode->pSdb, pRaw);
sdbSetApplyInfo(pMnode->pSdb, req.info.conn.applyIndex, req.info.conn.applyTerm, SYNC_INDEX_INVALID);
code = 0;
} else if (code == -1 && terrno == TSDB_CODE_SYN_NOT_LEADER) {
terrno = TSDB_CODE_APP_NOT_READY;
} else if (code == -1 && terrno == TSDB_CODE_SYN_INTERNAL_ERROR) {

View File

@ -149,7 +149,7 @@ int32_t tEncodeSTqHandle(SEncoder* pEncoder, const STqHandle* pHandle);
int32_t tDecodeSTqHandle(SDecoder* pDecoder, STqHandle* pHandle);
// tqRead
int32_t tqScan(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMetaRsp* pMetaRsp, STqOffsetVal* offset);
int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMetaRsp* pMetaRsp, STqOffsetVal* offset);
int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* pOffset);
int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHead** pHeadWithCkSum);
@ -181,8 +181,8 @@ int32_t tqOffsetDelete(STqOffsetStore* pStore, const char* subscribeKey)
int32_t tqOffsetCommitFile(STqOffsetStore* pStore);
// tqSink
void tqTableSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data);
void tqTableSink1(SStreamTask* pTask, void* vnode, int64_t ver, void* data);
void tqSinkToTableMerge(SStreamTask* pTask, void* vnode, int64_t ver, void* data);
void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* data);
// tqOffset
char* tqOffsetBuildFName(const char* path, int32_t ver);

View File

@ -596,7 +596,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
tqInitTaosxRsp(&taosxRsp, pReq);
if (fetchOffsetNew.type != TMQ_OFFSET__LOG) {
tqScan(pTq, pHandle, &taosxRsp, &metaRsp, &fetchOffsetNew);
tqScanTaosx(pTq, pHandle, &taosxRsp, &metaRsp, &fetchOffsetNew);
if (metaRsp.metaRspLen > 0) {
if (tqSendMetaPollRsp(pTq, pMsg, pReq, &metaRsp) < 0) {
@ -927,7 +927,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask) {
pTask->smaSink.smaSink = smaHandleRes;
} else if (pTask->outputType == TASK_OUTPUT__TABLE) {
pTask->tbSink.vnode = pTq->pVnode;
pTask->tbSink.tbSinkFunc = tqTableSink1;
pTask->tbSink.tbSinkFunc = tqSinkToTablePipeline;
ASSERT(pTask->tbSink.pSchemaWrapper);
ASSERT(pTask->tbSink.pSchemaWrapper->pSchema);

View File

@ -123,7 +123,7 @@ int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffs
return 0;
}
int32_t tqScan(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMetaRsp* pMetaRsp, STqOffsetVal* pOffset) {
int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMetaRsp* pMetaRsp, STqOffsetVal* pOffset) {
const STqExecHandle* pExec = &pHandle->execHandle;
qTaskInfo_t task = pExec->task;

View File

@ -284,7 +284,7 @@ SSubmitReq* tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchem
return ret;
}
void tqTableSink1(SStreamTask* pTask, void* vnode, int64_t ver, void* data) {
void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* data) {
const SArray* pBlocks = (const SArray*)data;
SVnode* pVnode = (SVnode*)vnode;
int64_t suid = pTask->tbSink.stbUid;
@ -530,7 +530,7 @@ void tqTableSink1(SStreamTask* pTask, void* vnode, int64_t ver, void* data) {
taosArrayDestroy(tagArray);
}
void tqTableSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data) {
void tqSinkToTableMerge(SStreamTask* pTask, void* vnode, int64_t ver, void* data) {
const SArray* pRes = (const SArray*)data;
SVnode* pVnode = (SVnode*)vnode;
SBatchDeleteReq deleteReq = {0};

View File

@ -330,6 +330,11 @@ int32_t vnodeGetBatchMeta(SVnode *pVnode, SRpcMsg *pMsg) {
rspSize += sizeof(int32_t);
offset = 0;
if (rspSize > MAX_META_BATCH_RSP_SIZE) {
code = TSDB_CODE_INVALID_MSG_LEN;
goto _exit;
}
pRsp = rpcMallocCont(rspSize);
if (pRsp == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;

View File

@ -302,9 +302,11 @@ int32_t ctgUpdateTbMeta(SCatalog* pCtg, STableMetaRsp* rspMsg, bool syncOp) {
_return:
taosMemoryFreeClear(output->tbMeta);
taosMemoryFreeClear(output);
if (output) {
taosMemoryFreeClear(output->tbMeta);
taosMemoryFreeClear(output);
}
CTG_RET(code);
}

View File

@ -252,7 +252,7 @@ int32_t ctgInitGetIndexTask(SCtgJob* pJob, int32_t taskIdx, void* param) {
SCtgIndexCtx* ctx = task.taskCtx;
strcpy(ctx->indexFName, name);
tstrncpy(ctx->indexFName, name, sizeof(ctx->indexFName));
taosArrayPush(pJob->pTasks, &task);
@ -277,7 +277,7 @@ int32_t ctgInitGetUdfTask(SCtgJob* pJob, int32_t taskIdx, void* param) {
SCtgUdfCtx* ctx = task.taskCtx;
strcpy(ctx->udfName, name);
tstrncpy(ctx->udfName, name, sizeof(ctx->udfName));
taosArrayPush(pJob->pTasks, &task);

View File

@ -660,7 +660,7 @@ int32_t ctgDropDbCacheEnqueue(SCatalog *pCtg, const char *dbFName, int64_t dbId)
}
msg->pCtg = pCtg;
strncpy(msg->dbFName, dbFName, sizeof(msg->dbFName));
tstrncpy(msg->dbFName, dbFName, sizeof(msg->dbFName));
msg->dbId = dbId;
op->data = msg;
@ -693,7 +693,7 @@ int32_t ctgDropDbVgroupEnqueue(SCatalog *pCtg, const char *dbFName, bool syncOp)
}
msg->pCtg = pCtg;
strncpy(msg->dbFName, dbFName, sizeof(msg->dbFName));
tstrncpy(msg->dbFName, dbFName, sizeof(msg->dbFName));
op->data = msg;
@ -721,8 +721,8 @@ int32_t ctgDropStbMetaEnqueue(SCatalog *pCtg, const char *dbFName, int64_t dbId,
}
msg->pCtg = pCtg;
strncpy(msg->dbFName, dbFName, sizeof(msg->dbFName));
strncpy(msg->stbName, stbName, sizeof(msg->stbName));
tstrncpy(msg->dbFName, dbFName, sizeof(msg->dbFName));
tstrncpy(msg->stbName, stbName, sizeof(msg->stbName));
msg->dbId = dbId;
msg->suid = suid;
@ -751,8 +751,8 @@ int32_t ctgDropTbMetaEnqueue(SCatalog *pCtg, const char *dbFName, int64_t dbId,
}
msg->pCtg = pCtg;
strncpy(msg->dbFName, dbFName, sizeof(msg->dbFName));
strncpy(msg->tbName, tbName, sizeof(msg->tbName));
tstrncpy(msg->dbFName, dbFName, sizeof(msg->dbFName));
tstrncpy(msg->tbName, tbName, sizeof(msg->tbName));
msg->dbId = dbId;
op->data = msg;
@ -785,7 +785,7 @@ int32_t ctgUpdateVgroupEnqueue(SCatalog *pCtg, const char *dbFName, int64_t dbId
dbFName = p + 1;
}
strncpy(msg->dbFName, dbFName, sizeof(msg->dbFName));
tstrncpy(msg->dbFName, dbFName, sizeof(msg->dbFName));
msg->pCtg = pCtg;
msg->dbId = dbId;
msg->dbInfo = dbInfo;
@ -817,7 +817,8 @@ int32_t ctgUpdateTbMetaEnqueue(SCatalog *pCtg, STableMetaOutput *output, bool sy
char *p = strchr(output->dbFName, '.');
if (p && IS_SYS_DBNAME(p + 1)) {
memmove(output->dbFName, p + 1, strlen(p + 1));
int32_t len = strlen(p + 1);
memmove(output->dbFName, p + 1, len >= TSDB_DB_FNAME_LEN ? TSDB_DB_FNAME_LEN - 1 : len);
}
msg->pCtg = pCtg;
@ -852,7 +853,7 @@ int32_t ctgUpdateVgEpsetEnqueue(SCatalog *pCtg, char *dbFName, int32_t vgId, SEp
}
msg->pCtg = pCtg;
strcpy(msg->dbFName, dbFName);
tstrncpy(msg->dbFName, dbFName, sizeof(msg->dbFName));
msg->vgId = vgId;
msg->epSet = *pEpSet;
@ -1215,7 +1216,7 @@ int32_t ctgAddNewDBCache(SCatalog *pCtg, const char *dbFName, uint64_t dbId) {
CTG_CACHE_STAT_INC(numOfDb, 1);
SDbVgVersion vgVersion = {.dbId = newDBCache.dbId, .vgVersion = -1};
strncpy(vgVersion.dbFName, dbFName, sizeof(vgVersion.dbFName));
tstrncpy(vgVersion.dbFName, dbFName, sizeof(vgVersion.dbFName));
ctgDebug("db added to cache, dbFName:%s, dbId:0x%" PRIx64, dbFName, dbId);
@ -1331,8 +1332,8 @@ int32_t ctgUpdateRentStbVersion(SCatalog *pCtg, char *dbFName, char *tbName, uin
metaRent.smaVer = pCache->pIndex->version;
}
strcpy(metaRent.dbFName, dbFName);
strcpy(metaRent.stbName, tbName);
tstrncpy(metaRent.dbFName, dbFName, sizeof(metaRent.dbFName));
tstrncpy(metaRent.stbName, tbName, sizeof(metaRent.stbName));
CTG_ERR_RET(ctgMetaRentUpdate(&pCtg->stbRent, &metaRent, metaRent.suid, sizeof(SSTableVersion),
ctgStbVersionSortCompare, ctgStbVersionSearchCompare));
@ -1418,8 +1419,10 @@ int32_t ctgWriteTbMetaToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFNam
ctgDebug("stb 0x%" PRIx64 " updated to cache, dbFName:%s, tbName:%s, tbType:%d", meta->suid, dbFName, tbName,
meta->tableType);
CTG_ERR_RET(ctgUpdateRentStbVersion(pCtg, dbFName, tbName, dbId, meta->suid, pCache));
if (pCache) {
CTG_ERR_RET(ctgUpdateRentStbVersion(pCtg, dbFName, tbName, dbId, meta->suid, pCache));
}
return TSDB_CODE_SUCCESS;
}
@ -1590,7 +1593,7 @@ int32_t ctgOpUpdateVgroup(SCtgCacheOperation *operation) {
dbCache = NULL;
strncpy(vgVersion.dbFName, dbFName, sizeof(vgVersion.dbFName));
tstrncpy(vgVersion.dbFName, dbFName, sizeof(vgVersion.dbFName));
CTG_ERR_JRET(ctgMetaRentUpdate(&msg->pCtg->dbRent, &vgVersion, vgVersion.dbId, sizeof(SDbVgVersion),
ctgDbVgVersionSortCompare, ctgDbVgVersionSearchCompare));
@ -1680,9 +1683,9 @@ int32_t ctgOpUpdateTbMeta(SCtgCacheOperation *operation) {
if (CTG_IS_META_TABLE(pMeta->metaType) || CTG_IS_META_BOTH(pMeta->metaType)) {
int32_t metaSize = CTG_META_SIZE(pMeta->tbMeta);
CTG_ERR_JRET(
ctgWriteTbMetaToCache(pCtg, dbCache, pMeta->dbFName, pMeta->dbId, pMeta->tbName, pMeta->tbMeta, metaSize));
code = ctgWriteTbMetaToCache(pCtg, dbCache, pMeta->dbFName, pMeta->dbId, pMeta->tbName, pMeta->tbMeta, metaSize);
pMeta->tbMeta = NULL;
CTG_ERR_JRET(code);
}
if (CTG_IS_META_CTABLE(pMeta->metaType) || CTG_IS_META_BOTH(pMeta->metaType)) {
@ -1697,10 +1700,8 @@ int32_t ctgOpUpdateTbMeta(SCtgCacheOperation *operation) {
_return:
if (pMeta) {
taosMemoryFreeClear(pMeta->tbMeta);
taosMemoryFreeClear(pMeta);
}
taosMemoryFreeClear(pMeta->tbMeta);
taosMemoryFreeClear(pMeta);
taosMemoryFreeClear(msg);

View File

@ -361,7 +361,12 @@ int32_t appendTagValues(char* buf, int32_t* len, STableCfg* pCfg) {
SArray* pTagVals = NULL;
STag* pTag = (STag*)pCfg->pTags;
if (pCfg->pTags && tTagIsJson(pTag)) {
if (NULL == pCfg->pTags || pCfg->numOfTags <= 0) {
qError("tag missed in table cfg, pointer:%p, numOfTags:%d", pCfg->pTags, pCfg->numOfTags);
return TSDB_CODE_APP_ERROR;
}
if (tTagIsJson(pTag)) {
char* pJson = parseTagDatatoJson(pTag);
if (pJson) {
*len += sprintf(buf + VARSTR_HEADER_SIZE + *len, "%s", pJson);

View File

@ -53,6 +53,11 @@ typedef int32_t (*__block_search_fn_t)(char* data, int32_t num, int64_t key, int
#define NEEDTO_COMPRESS_QUERY(size) ((size) > tsCompressColData ? 1 : 0)
#define IS_VALID_SESSION_WIN(winInfo) ((winInfo).sessionWin.win.skey > 0)
#define SET_SESSION_WIN_INVALID(winInfo) ((winInfo).sessionWin.win.skey = INT64_MIN)
#define IS_INVALID_SESSION_WIN_KEY(winKey) ((winKey).win.skey <= 0)
#define SET_SESSION_WIN_KEY_INVALID(pWinKey) ((pWinKey)->win.skey = INT64_MIN)
enum {
// when this task starts to execute, this status will set
TASK_NOT_COMPLETED = 0x1u,
@ -434,15 +439,15 @@ typedef struct SCatchSupporter {
} SCatchSupporter;
typedef struct SStreamAggSupporter {
SHashObj* pResultRows;
SArray* pCurWins;
int32_t valueSize;
int32_t keySize;
char* pKeyBuf; // window key buffer
SDiskbasedBuf* pResultBuf; // query result buffer based on blocked-wised disk file
int32_t resultRowSize; // the result buffer size for each result row, with the meta data size for each row
int32_t currentPageId; // buffer page that is active
SSDataBlock* pScanBlock;
int32_t resultRowSize; // the result buffer size for each result row, with the meta data size for each row
SSDataBlock* pScanBlock;
SStreamState* pState;
int64_t gap; // stream session window gap
SqlFunctionCtx* pDummyCtx; // for combine
SSHashObj* pResultRows;
int32_t stateKeySize;
int16_t stateKeyType;
SDiskbasedBuf* pResultBuf;
} SStreamAggSupporter;
typedef struct SWindowSupporter {
@ -736,42 +741,54 @@ typedef struct SSessionAggOperatorInfo {
} SSessionAggOperatorInfo;
typedef struct SResultWindowInfo {
SResultRowPosition pos;
STimeWindow win;
uint64_t groupId;
void* pOutputBuf;
SSessionKey sessionWin;
bool isOutput;
bool isClosed;
} SResultWindowInfo;
typedef struct SStateWindowInfo {
SResultWindowInfo winInfo;
SStateKeys stateKey;
SStateKeys* pStateKey;
} SStateWindowInfo;
typedef struct SStreamSessionAggOperatorInfo {
SOptrBasicInfo binfo;
SStreamAggSupporter streamAggSup;
SExprSupp scalarSupp; // supporter for perform scalar function
SExprSupp scalarSupp; // supporter for perform scalar function
SGroupResInfo groupResInfo;
int64_t gap; // session window gap
int32_t primaryTsIndex; // primary timestamp slot id
int32_t endTsIndex; // window end timestamp slot id
int32_t order; // current SSDataBlock scan order
STimeWindowAggSupp twAggSup;
SSDataBlock* pWinBlock; // window result
SqlFunctionCtx* pDummyCtx; // for combine
SSDataBlock* pDelRes; // delete result
SSDataBlock* pUpdateRes; // update window
SSDataBlock* pWinBlock; // window result
SSDataBlock* pDelRes; // delete result
SSDataBlock* pUpdateRes; // update window
bool returnUpdate;
SHashObj* pStDeleted;
SSHashObj* pStDeleted;
void* pDelIterator;
SArray* pChildren; // cache for children's result; final stream operator
SPhysiNode* pPhyNode; // create new child
SArray* pChildren; // cache for children's result; final stream operator
SPhysiNode* pPhyNode; // create new child
bool isFinal;
bool ignoreExpiredData;
SHashObj* pGroupIdTbNameMap;
} SStreamSessionAggOperatorInfo;
typedef struct SStreamStateAggOperatorInfo {
SOptrBasicInfo binfo;
SStreamAggSupporter streamAggSup;
SExprSupp scalarSupp; // supporter for perform scalar function
SGroupResInfo groupResInfo;
int32_t primaryTsIndex; // primary timestamp slot id
STimeWindowAggSupp twAggSup;
SColumn stateCol;
SSDataBlock* pDelRes;
SSHashObj* pSeDeleted;
void* pDelIterator;
SArray* pChildren; // cache for children's result;
bool ignoreExpiredData;
SHashObj* pGroupIdTbNameMap;
} SStreamStateAggOperatorInfo;
typedef struct SStreamPartitionOperatorInfo {
SOptrBasicInfo binfo;
SPartitionBySupporter partitionSup;
@ -834,24 +851,6 @@ typedef struct SStateWindowOperatorInfo {
const SNode* pCondition;
} SStateWindowOperatorInfo;
typedef struct SStreamStateAggOperatorInfo {
SOptrBasicInfo binfo;
SStreamAggSupporter streamAggSup;
SExprSupp scalarSupp; // supporter for perform scalar function
SGroupResInfo groupResInfo;
int32_t primaryTsIndex; // primary timestamp slot id
int32_t order; // current SSDataBlock scan order
STimeWindowAggSupp twAggSup;
SColumn stateCol;
SqlFunctionCtx* pDummyCtx; // for combine
SSDataBlock* pDelRes;
SHashObj* pSeDeleted;
void* pDelIterator;
SArray* pChildren; // cache for children's result;
bool ignoreExpiredData;
SHashObj* pGroupIdTbNameMap;
} SStreamStateAggOperatorInfo;
typedef struct SSortOperatorInfo {
SOptrBasicInfo binfo;
uint32_t sortBufSize; // max buffer size for in-memory sort
@ -1064,13 +1063,8 @@ STimeWindow getActiveTimeWindow(SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowI
int32_t getNumOfRowsInTimeWindow(SDataBlockInfo* pDataBlockInfo, TSKEY* pPrimaryColumn, int32_t startPos, TSKEY ekey,
__block_search_fn_t searchFn, STableQueryInfo* item, int32_t order);
int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order);
int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, const char* pKey, SqlFunctionCtx* pCtx, int32_t numOfOutput,
int32_t size);
SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int32_t* currentPageId, int32_t interBufSize);
SResultWindowInfo* getSessionTimeWindow(SStreamAggSupporter* pAggSup, TSKEY startTs, TSKEY endTs, uint64_t groupId,
int64_t gap, int32_t* pIndex);
SResultWindowInfo* getCurSessionWindow(SStreamAggSupporter* pAggSup, TSKEY startTs, TSKEY endTs, uint64_t groupId,
int64_t gap, int32_t* pIndex);
void getCurSessionWindow(SStreamAggSupporter* pAggSup, TSKEY startTs, TSKEY endTs, uint64_t groupId, SSessionKey* pKey);
bool isInTimeWindow(STimeWindow* pWin, TSKEY ts, int64_t gap);
bool functionNeedToExecute(SqlFunctionCtx* pCtx);
bool isOverdue(TSKEY ts, STimeWindowAggSupp* pSup);
@ -1100,6 +1094,9 @@ int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle,
void* destroySqlFunctionCtx(SqlFunctionCtx* pCtx, int32_t numOfOutput);
int32_t buildDataBlockFromGroupRes(SOperatorInfo* pOperator, SStreamState* pState, SSDataBlock* pBlock, SExprSupp* pSup,
SGroupResInfo* pGroupResInfo);
int32_t saveSessionDiscBuf(SStreamState* pState, SSessionKey* key, void* buf, int32_t size);
int32_t buildSessionResultDataBlock(SExecTaskInfo* pTaskInfo, SStreamState* pState, SSDataBlock* pBlock,
SExprSupp* pSup, SGroupResInfo* pGroupResInfo);
int32_t setOutputBuf(SStreamState* pState, STimeWindow* win, SResultRow** pResult, int64_t tableGroupId, SqlFunctionCtx* pCtx,
int32_t numOfOutput, int32_t* rowEntryInfoOffset, SAggSupporter* pAggSup);
int32_t releaseOutputBuf(SStreamState* pState, SWinKey* pKey, SResultRow* pResult);

View File

@ -143,9 +143,15 @@ static int32_t getStatus(SDataDispatchHandle* pDispatcher) {
static int32_t putDataBlock(SDataSinkHandle* pHandle, const SInputData* pInput, bool* pContinue) {
SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
SDataDispatchBuf* pBuf = taosAllocateQitem(sizeof(SDataDispatchBuf), DEF_QITEM);
if (NULL == pBuf || !allocBuf(pDispatcher, pInput, pBuf)) {
if (NULL == pBuf) {
return TSDB_CODE_QRY_OUT_OF_MEMORY;
}
if (!allocBuf(pDispatcher, pInput, pBuf)) {
taosFreeQitem(pBuf);
return TSDB_CODE_QRY_OUT_OF_MEMORY;
}
toDataCacheEntry(pDispatcher, pInput, pBuf);
taosWriteQitem(pDispatcher->pDataBlocks, pBuf);
*pContinue = (DS_BUF_LOW == updateStatus(pDispatcher) ? true : false);

View File

@ -323,7 +323,7 @@ int32_t createDataInserter(SDataSinkManager* pManager, const SDataSinkNode* pDat
int32_t code =
tsdbGetTableSchema(inserter->pParam->readHandle->vnode, pInserterNode->tableId, &inserter->pSchema, &suid);
if (code) {
destroyDataSinker((SDataSinkHandle*)pInserterNode);
destroyDataSinker((SDataSinkHandle*)inserter);
return code;
}

View File

@ -4192,42 +4192,6 @@ int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SArray* pExecInf
return TSDB_CODE_SUCCESS;
}
int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, const char* pKey, SqlFunctionCtx* pCtx, int32_t numOfOutput,
int32_t size) {
pSup->currentPageId = -1;
pSup->resultRowSize = getResultRowSize(pCtx, numOfOutput);
pSup->keySize = sizeof(int64_t) + sizeof(TSKEY);
pSup->pKeyBuf = taosMemoryCalloc(1, pSup->keySize);
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
pSup->pResultRows = taosHashInit(1024, hashFn, false, HASH_NO_LOCK);
if (pSup->pKeyBuf == NULL || pSup->pResultRows == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pSup->valueSize = size;
pSup->pScanBlock = createSpecialDataBlock(STREAM_CLEAR);
int32_t pageSize = 4096;
while (pageSize < pSup->resultRowSize * 4) {
pageSize <<= 1u;
}
// at least four pages need to be in buffer
int32_t bufSize = 4096 * 256;
if (bufSize <= pageSize) {
bufSize = pageSize * 4;
}
if (!osTempSpaceAvailable()) {
terrno = TSDB_CODE_NO_AVAIL_DISK;
qError("Init stream agg supporter failed since %s", terrstr(terrno));
return terrno;
}
int32_t code = createDiskbasedBuf(&pSup->pResultBuf, pageSize, bufSize, pKey, tsTempDir);
for (int32_t i = 0; i < numOfOutput; ++i) {
pCtx[i].saveHandle.pBuf = pSup->pResultBuf;
}
return code;
}
int32_t setOutputBuf(SStreamState* pState, STimeWindow* win, SResultRow** pResult, int64_t tableGroupId,
SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowEntryInfoOffset, SAggSupporter* pAggSup) {
SWinKey key = {
@ -4237,7 +4201,6 @@ int32_t setOutputBuf(SStreamState* pState, STimeWindow* win, SResultRow** pResul
char* value = NULL;
int32_t size = pAggSup->resultRowSize;
tSimpleHashPut(pAggSup->pResultRowHashTable, &key, sizeof(SWinKey), NULL, 0);
if (streamStateAddIfNotExist(pState, &key, (void**)&value, &size) < 0) {
return TSDB_CODE_QRY_OUT_OF_MEMORY;
}
@ -4342,3 +4305,82 @@ int32_t buildDataBlockFromGroupRes(SOperatorInfo* pOperator, SStreamState* pStat
blockDataUpdateTsWindow(pBlock, 0);
return TSDB_CODE_SUCCESS;
}
int32_t saveSessionDiscBuf(SStreamState* pState, SSessionKey* key, void* buf, int32_t size) {
streamStateSessionPut(pState, key, (const void*)buf, size);
releaseOutputBuf(pState, NULL, (SResultRow*)buf);
return TSDB_CODE_SUCCESS;
}
int32_t buildSessionResultDataBlock(SExecTaskInfo* pTaskInfo, SStreamState* pState, SSDataBlock* pBlock,
SExprSupp* pSup, SGroupResInfo* pGroupResInfo) {
SExprInfo* pExprInfo = pSup->pExprInfo;
int32_t numOfExprs = pSup->numOfExprs;
int32_t* rowEntryOffset = pSup->rowEntryInfoOffset;
SqlFunctionCtx* pCtx = pSup->pCtx;
int32_t numOfRows = getNumOfTotalRes(pGroupResInfo);
for (int32_t i = pGroupResInfo->index; i < numOfRows; i += 1) {
SSessionKey* pKey = taosArrayGet(pGroupResInfo->pRows, i);
int32_t size = 0;
void* pVal = NULL;
int32_t code = streamStateSessionGet(pState, pKey, &pVal, &size);
ASSERT(code == 0);
SResultRow* pRow = (SResultRow*)pVal;
doUpdateNumOfRows(pCtx, pRow, numOfExprs, rowEntryOffset);
// no results, continue to check the next one
if (pRow->numOfRows == 0) {
pGroupResInfo->index += 1;
releaseOutputBuf(pState, NULL, pRow);
continue;
}
if (pBlock->info.groupId == 0) {
pBlock->info.groupId = pKey->groupId;
} else {
// current value belongs to different group, it can't be packed into one datablock
if (pBlock->info.groupId != pKey->groupId) {
releaseOutputBuf(pState, NULL, pRow);
break;
}
}
if (pBlock->info.rows + pRow->numOfRows > pBlock->info.capacity) {
ASSERT(pBlock->info.rows > 0);
releaseOutputBuf(pState, NULL, pRow);
break;
}
pGroupResInfo->index += 1;
for (int32_t j = 0; j < numOfExprs; ++j) {
int32_t slotId = pExprInfo[j].base.resSchema.slotId;
pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowEntryOffset);
if (pCtx[j].fpSet.finalize) {
int32_t code1 = pCtx[j].fpSet.finalize(&pCtx[j], pBlock);
if (TAOS_FAILED(code1)) {
qError("%s build result data block error, code %s", GET_TASKID(pTaskInfo), tstrerror(code1));
T_LONG_JMP(pTaskInfo->env, code1);
}
} else if (strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_select_value") == 0) {
// do nothing, todo refactor
} else {
// expand the result into multiple rows. E.g., _wstart, top(k, 20)
// the _wstart needs to copy to 20 following rows, since the results of top-k expands to 20 different rows.
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId);
char* in = GET_ROWCELL_INTERBUF(pCtx[j].resultInfo);
for (int32_t k = 0; k < pRow->numOfRows; ++k) {
colDataAppend(pColInfoData, pBlock->info.rows + k, in, pCtx[j].resultInfo->isNullRes);
}
}
}
pBlock->info.rows += pRow->numOfRows;
// saveSessionDiscBuf(pState, pKey, pVal, size);
releaseOutputBuf(pState, NULL, pRow);
}
blockDataUpdateTsWindow(pBlock, 0);
return TSDB_CODE_SUCCESS;
}

View File

@ -1190,23 +1190,22 @@ static int32_t generateSessionScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSr
SColumnInfoData* pDestGpCol = taosArrayGet(pDestBlock->pDataBlock, GROUPID_COLUMN_INDEX);
SColumnInfoData* pDestCalStartTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
SColumnInfoData* pDestCalEndTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
int32_t dummy = 0;
int64_t version = pSrcBlock->info.version - 1;
for (int32_t i = 0; i < pSrcBlock->info.rows; i++) {
uint64_t groupId = getGroupIdByData(pInfo, uidCol[i], startData[i], version);
// gap must be 0.
SResultWindowInfo* pStartWin =
getCurSessionWindow(pInfo->windowSup.pStreamAggSup, startData[i], endData[i], groupId, 0, &dummy);
if (!pStartWin) {
SSessionKey startWin = {0};
getCurSessionWindow(pInfo->windowSup.pStreamAggSup, startData[i], endData[i], groupId, &startWin);
if (IS_INVALID_SESSION_WIN_KEY(startWin)) {
// window has been closed.
continue;
}
SResultWindowInfo* pEndWin =
getCurSessionWindow(pInfo->windowSup.pStreamAggSup, endData[i], endData[i], groupId, 0, &dummy);
ASSERT(pEndWin);
TSKEY ts = INT64_MIN;
colDataAppend(pDestStartCol, i, (const char*)&pStartWin->win.skey, false);
colDataAppend(pDestEndCol, i, (const char*)&pEndWin->win.ekey, false);
SSessionKey endWin = {0};
getCurSessionWindow(pInfo->windowSup.pStreamAggSup, endData[i], endData[i], groupId, &endWin);
ASSERT(!IS_INVALID_SESSION_WIN_KEY(endWin));
colDataAppend(pDestStartCol, i, (const char*)&startWin.win.skey, false);
colDataAppend(pDestEndCol, i, (const char*)&endWin.win.ekey, false);
colDataAppendNULL(pDestUidCol, i);
colDataAppend(pDestGpCol, i, (const char*)&groupId, false);
colDataAppendNULL(pDestCalStartTsCol, i);

View File

@ -693,7 +693,7 @@ void* destroyStreamFillSupporter(SStreamFillSupporter* pFillSup) {
pFillSup->pAllColInfo = destroyFillColumnInfo(pFillSup->pAllColInfo, pFillSup->numOfFillCols, pFillSup->numOfAllCols);
tSimpleHashCleanup(pFillSup->pResMap);
pFillSup->pResMap = NULL;
streamStateReleaseBuf(NULL, NULL, pFillSup->cur.pRowVal);
releaseOutputBuf(NULL, NULL, (SResultRow*)pFillSup->cur.pRowVal);
pFillSup->cur.pRowVal = NULL;
taosMemoryFree(pFillSup);
@ -736,7 +736,7 @@ static void resetFillWindow(SResultRowData* pRowData) {
void resetPrevAndNextWindow(SStreamFillSupporter* pFillSup, SStreamState* pState) {
resetFillWindow(&pFillSup->prev);
streamStateReleaseBuf(NULL, NULL, pFillSup->cur.pRowVal);
releaseOutputBuf(NULL, NULL, (SResultRow*)pFillSup->cur.pRowVal);
resetFillWindow(&pFillSup->cur);
resetFillWindow(&pFillSup->next);
resetFillWindow(&pFillSup->nextNext);

File diff suppressed because it is too large Load Diff

View File

@ -2527,6 +2527,8 @@ int32_t apercentileFunction(SqlFunctionCtx* pCtx) {
int32_t start = pInput->startRowIndex;
if (pInfo->algo == APERCT_ALGO_TDIGEST) {
buildTDigestInfo(pInfo);
tdigestAutoFill(pInfo->pTDigest, COMPRESSION);
for (int32_t i = start; i < pInput->numOfRows + start; ++i) {
if (colDataIsNull_f(pCol->nullbitmap, i)) {
continue;
@ -2540,13 +2542,11 @@ int32_t apercentileFunction(SqlFunctionCtx* pCtx) {
tdigestAdd(pInfo->pTDigest, v, w);
}
} else {
qDebug("%s before add %d elements into histogram, total:%" PRId64 ", numOfEntry:%d, pHisto:%p, elems: %p",
__FUNCTION__, numOfElems, pInfo->pHisto->numOfElems, pInfo->pHisto->numOfEntries, pInfo->pHisto,
pInfo->pHisto->elems);
// might be a race condition here that pHisto can be overwritten or setup function
// has not been called, need to relink the buffer pHisto points to.
buildHistogramInfo(pInfo);
qDebug("%s before add %d elements into histogram, total:%" PRId64 ", numOfEntry:%d, pHisto:%p, elems: %p", __FUNCTION__,
numOfElems, pInfo->pHisto->numOfElems, pInfo->pHisto->numOfEntries, pInfo->pHisto, pInfo->pHisto->elems);
for (int32_t i = start; i < pInput->numOfRows + start; ++i) {
if (colDataIsNull_f(pCol->nullbitmap, i)) {
continue;
@ -2581,8 +2581,9 @@ static void apercentileTransferInfo(SAPercentileInfo* pInput, SAPercentileInfo*
buildTDigestInfo(pOutput);
TDigest* pTDigest = pOutput->pTDigest;
tdigestAutoFill(pTDigest, COMPRESSION);
if (pTDigest->num_centroids <= 0) {
if (pTDigest->num_centroids <= 0 && pTDigest->num_buffered_pts == 0) {
memcpy(pTDigest, pInput->pTDigest, (size_t)TDIGEST_SIZE(COMPRESSION));
tdigestAutoFill(pTDigest, COMPRESSION);
} else {
@ -2654,6 +2655,7 @@ int32_t apercentileFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
if (pInfo->algo == APERCT_ALGO_TDIGEST) {
buildTDigestInfo(pInfo);
tdigestAutoFill(pInfo->pTDigest, COMPRESSION);
if (pInfo->pTDigest->size > 0) {
pInfo->result = tdigestQuantile(pInfo->pTDigest, pInfo->percent / 100);
} else { // no need to free
@ -5358,16 +5360,14 @@ int32_t modeFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
int32_t maxCount = 0;
for (int32_t i = 0; i < pInfo->numOfPoints; ++i) {
SModeItem* pItem = (SModeItem*)(pInfo->pItems + i * (sizeof(SModeItem) + pInfo->colBytes));
if (pItem->count > maxCount) {
if (pItem->count >= maxCount) {
maxCount = pItem->count;
resIndex = i;
} else if (pItem->count == maxCount) {
resIndex = -1;
}
}
SModeItem* pResItem = (SModeItem*)(pInfo->pItems + resIndex * (sizeof(SModeItem) + pInfo->colBytes));
colDataAppend(pCol, currentRow, pResItem->data, (resIndex == -1) ? true : false);
colDataAppend(pCol, currentRow, pResItem->data, (maxCount == 0) ? true : false);
return pResInfo->numOfRes;
}

View File

@ -357,8 +357,7 @@ char* parseTagDatatoJson(void* p) {
for (int j = 0; j < nCols; ++j) {
STagVal* pTagVal = (STagVal*)taosArrayGet(pTagVals, j);
// json key encode by binary
memset(tagJsonKey, 0, sizeof(tagJsonKey));
memcpy(tagJsonKey, pTagVal->pKey, strlen(pTagVal->pKey));
tstrncpy(tagJsonKey, pTagVal->pKey, sizeof(tagJsonKey));
// json value
char type = pTagVal->type;
if (type == TSDB_DATA_TYPE_NULL) {

View File

@ -173,7 +173,7 @@ int32_t queryBuildGetDBCfgMsg(void *input, char **msg, int32_t msgSize, int32_t
}
SDbCfgReq dbCfgReq = {0};
strcpy(dbCfgReq.db, input);
strncpy(dbCfgReq.db, input, sizeof(dbCfgReq.db) - 1);
int32_t bufLen = tSerializeSDbCfgReq(NULL, 0, &dbCfgReq);
void *pBuf = (*mallcFp)(bufLen);
@ -191,7 +191,7 @@ int32_t queryBuildGetIndexMsg(void *input, char **msg, int32_t msgSize, int32_t
}
SUserIndexReq indexReq = {0};
strcpy(indexReq.indexFName, input);
strncpy(indexReq.indexFName, input, sizeof(indexReq.indexFName) - 1);
int32_t bufLen = tSerializeSUserIndexReq(NULL, 0, &indexReq);
void *pBuf = (*mallcFp)(bufLen);
@ -233,7 +233,7 @@ int32_t queryBuildGetUserAuthMsg(void *input, char **msg, int32_t msgSize, int32
}
SGetUserAuthReq req = {0};
strncpy(req.user, input, sizeof(req.user));
strncpy(req.user, input, sizeof(req.user) - 1);
int32_t bufLen = tSerializeSGetUserAuthReq(NULL, 0, &req);
void *pBuf = (*mallcFp)(bufLen);
@ -251,7 +251,7 @@ int32_t queryBuildGetTbIndexMsg(void *input, char **msg, int32_t msgSize, int32_
}
STableIndexReq indexReq = {0};
strcpy(indexReq.tbFName, input);
strncpy(indexReq.tbFName, input, sizeof(indexReq.tbFName) - 1);
int32_t bufLen = tSerializeSTableIndexReq(NULL, 0, &indexReq);
void *pBuf = (*mallcFp)(bufLen);
@ -271,8 +271,8 @@ int32_t queryBuildGetTbCfgMsg(void *input, char **msg, int32_t msgSize, int32_t
SBuildTableInput *pInput = input;
STableCfgReq cfgReq = {0};
cfgReq.header.vgId = pInput->vgId;
strncpy(cfgReq.dbFName, pInput->dbFName, sizeof(cfgReq.dbFName));
strncpy(cfgReq.tbName, pInput->tbName, sizeof(cfgReq.tbName));
strncpy(cfgReq.dbFName, pInput->dbFName, sizeof(cfgReq.dbFName) - 1);
strncpy(cfgReq.tbName, pInput->tbName, sizeof(cfgReq.tbName) - 1);
int32_t bufLen = tSerializeSTableCfgReq(NULL, 0, &cfgReq);
void *pBuf = (*mallcFp)(bufLen);

View File

@ -412,7 +412,7 @@ void qwSetHbParam(int64_t refId, SQWHbParam **pParam) {
while (true) {
paramIdx = atomic_load_32(&gQwMgmt.paramIdx);
if (paramIdx == tListLen(gQwMgmt.param)) {
newParamIdx = 0;
newParamIdx = 1;
} else {
newParamIdx = paramIdx + 1;
}
@ -422,6 +422,10 @@ void qwSetHbParam(int64_t refId, SQWHbParam **pParam) {
}
}
if (paramIdx == tListLen(gQwMgmt.param)) {
paramIdx = 0;
}
gQwMgmt.param[paramIdx].qwrId = gQwMgmt.qwRef;
gQwMgmt.param[paramIdx].refId = refId;

View File

@ -398,7 +398,6 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu
if (QW_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) {
QW_TASK_ELOG("task already dropped at wrong phase %s", qwPhaseStr(phase));
QW_ERR_JRET(TSDB_CODE_QRY_TASK_STATUS_ERROR);
break;
}
if (QW_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {

View File

@ -430,7 +430,8 @@ int32_t schHandleRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32
if (SCH_IS_DATA_BIND_TASK(pTask)) {
if (NULL == pData->pEpSet) {
SCH_TASK_ELOG("no epset updated while got error %s", tstrerror(rspCode));
SCH_ERR_JRET(rspCode);
code = rspCode;
goto _return;
}
}

View File

@ -24,6 +24,40 @@ typedef struct SStateKey {
int64_t opNum;
} SStateKey;
typedef struct SStateSessionKey {
SSessionKey key;
int64_t opNum;
} SStateSessionKey;
static inline int sessionKeyCmpr(const SSessionKey* pWin1, const SSessionKey* pWin2) {
if (pWin1->groupId > pWin2->groupId) {
return 1;
} else if (pWin1->groupId < pWin2->groupId) {
return -1;
}
if (pWin1->win.skey > pWin2->win.ekey) {
return 1;
} else if (pWin1->win.ekey < pWin2->win.skey) {
return -1;
}
return 0;
}
static inline int stateSessionKeyCmpr(const void* pKey1, int kLen1, const void* pKey2, int kLen2) {
SStateSessionKey* pWin1 = (SStateSessionKey*)pKey1;
SStateSessionKey* pWin2 = (SStateSessionKey*)pKey2;
if (pWin1->opNum > pWin2->opNum) {
return 1;
} else if (pWin1->opNum < pWin2->opNum) {
return -1;
}
return sessionKeyCmpr(&pWin1->key, &pWin2->key);
}
static inline int stateKeyCmpr(const void* pKey1, int kLen1, const void* pKey2, int kLen2) {
SStateKey* pWin1 = (SStateKey*)pKey1;
SStateKey* pWin2 = (SStateKey*)pKey2;
@ -79,6 +113,11 @@ SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int
goto _err;
}
if (tdbTbOpen("session.state.db", sizeof(SStateSessionKey), -1, stateSessionKeyCmpr, pState->db,
&pState->pSessionStateDb) < 0) {
goto _err;
}
if (tdbTbOpen("func.state.db", sizeof(STupleKey), -1, STupleKeyCmpr, pState->db, &pState->pFuncStateDb) < 0) {
goto _err;
}
@ -95,6 +134,7 @@ _err:
tdbTbClose(pState->pStateDb);
tdbTbClose(pState->pFuncStateDb);
tdbTbClose(pState->pFillStateDb);
tdbTbClose(pState->pSessionStateDb);
tdbClose(pState->db);
taosMemoryFree(pState);
return NULL;
@ -105,6 +145,7 @@ void streamStateClose(SStreamState* pState) {
tdbTbClose(pState->pStateDb);
tdbTbClose(pState->pFuncStateDb);
tdbTbClose(pState->pFillStateDb);
tdbTbClose(pState->pSessionStateDb);
tdbClose(pState->db);
taosMemoryFree(pState);
@ -241,11 +282,11 @@ SStreamStateCur* streamStateGetCur(SStreamState* pState, const SWinKey* key) {
if (pCur == NULL) return NULL;
tdbTbcOpen(pState->pStateDb, &pCur->pCur, NULL);
int32_t c;
int32_t c = 0;
SStateKey sKey = {.key = *key, .opNum = pState->number};
tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateKey), &c);
if (c != 0) {
taosMemoryFree(pCur);
streamStateFreeCur(pCur);
return NULL;
}
pCur->number = pState->number;
@ -257,7 +298,7 @@ SStreamStateCur* streamStateFillGetCur(SStreamState* pState, const SWinKey* key)
if (pCur == NULL) return NULL;
tdbTbcOpen(pState->pFillStateDb, &pCur->pCur, NULL);
int32_t c;
int32_t c = 0;
tdbTbcMoveTo(pCur->pCur, key, sizeof(SWinKey), &c);
if (c != 0) {
streamStateFreeCur(pCur);
@ -348,21 +389,21 @@ SStreamStateCur* streamStateSeekKeyNext(SStreamState* pState, const SWinKey* key
}
pCur->number = pState->number;
if (tdbTbcOpen(pState->pStateDb, &pCur->pCur, NULL) < 0) {
taosMemoryFree(pCur);
streamStateFreeCur(pCur);
return NULL;
}
SStateKey sKey = {.key = *key, .opNum = pState->number};
int32_t c;
int32_t c = 0;
if (tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateKey), &c) < 0) {
tdbTbcClose(pCur->pCur);
taosMemoryFree(pCur);
streamStateFreeCur(pCur);
return NULL;
}
if (c > 0) return pCur;
if (tdbTbcMoveToNext(pCur->pCur) < 0) {
taosMemoryFree(pCur);
streamStateFreeCur(pCur);
return NULL;
}
@ -375,20 +416,20 @@ SStreamStateCur* streamStateFillSeekKeyNext(SStreamState* pState, const SWinKey*
return NULL;
}
if (tdbTbcOpen(pState->pFillStateDb, &pCur->pCur, NULL) < 0) {
taosMemoryFree(pCur);
streamStateFreeCur(pCur);
return NULL;
}
int32_t c;
int32_t c = 0;
if (tdbTbcMoveTo(pCur->pCur, key, sizeof(SWinKey), &c) < 0) {
tdbTbcClose(pCur->pCur);
taosMemoryFree(pCur);
streamStateFreeCur(pCur);
return NULL;
}
if (c > 0) return pCur;
if (tdbTbcMoveToNext(pCur->pCur) < 0) {
taosMemoryFree(pCur);
streamStateFreeCur(pCur);
return NULL;
}
@ -401,20 +442,20 @@ SStreamStateCur* streamStateFillSeekKeyPrev(SStreamState* pState, const SWinKey*
return NULL;
}
if (tdbTbcOpen(pState->pFillStateDb, &pCur->pCur, NULL) < 0) {
taosMemoryFree(pCur);
streamStateFreeCur(pCur);
return NULL;
}
int32_t c;
int32_t c = 0;
if (tdbTbcMoveTo(pCur->pCur, key, sizeof(SWinKey), &c) < 0) {
tdbTbcClose(pCur->pCur);
taosMemoryFree(pCur);
streamStateFreeCur(pCur);
return NULL;
}
if (c < 0) return pCur;
if (tdbTbcMoveToPrev(pCur->pCur) < 0) {
taosMemoryFree(pCur);
streamStateFreeCur(pCur);
return NULL;
}
@ -445,3 +486,229 @@ void streamStateFreeCur(SStreamStateCur* pCur) {
}
void streamFreeVal(void* val) { tdbFree(val); }
int32_t streamStateSessionPut(SStreamState* pState, const SSessionKey* key, const void* value, int32_t vLen) {
SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
return tdbTbUpsert(pState->pSessionStateDb, &sKey, sizeof(SStateSessionKey), value, vLen, &pState->txn);
}
SStreamStateCur* streamStateSessionGetRanomCur(SStreamState* pState, const SSessionKey* key) {
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
if (pCur == NULL) return NULL;
tdbTbcOpen(pState->pSessionStateDb, &pCur->pCur, NULL);
int32_t c = 0;
SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateSessionKey), &c);
if (c != 0) {
streamStateFreeCur(pCur);
return NULL;
}
pCur->number = pState->number;
return pCur;
}
int32_t streamStateSessionGet(SStreamState* pState, SSessionKey* key, void** pVal, int32_t* pVLen) {
SStreamStateCur* pCur = streamStateSessionGetRanomCur(pState, key);
void* tmp = NULL;
if (streamStateSessionGetKVByCur(pCur, key, (const void**)&tmp, pVLen) == 0) {
*pVal = tdbRealloc(NULL, *pVLen);
memcpy(*pVal, tmp, *pVLen);
streamStateFreeCur(pCur);
return 0;
}
streamStateFreeCur(pCur);
return -1;
}
int32_t streamStateSessionDel(SStreamState* pState, const SSessionKey* key) {
SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
return tdbTbDelete(pState->pSessionStateDb, &sKey, sizeof(SStateSessionKey), &pState->txn);
}
SStreamStateCur* streamStateSessionSeekKeyPrev(SStreamState* pState, const SSessionKey* key) {
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
if (pCur == NULL) {
return NULL;
}
pCur->number = pState->number;
if (tdbTbcOpen(pState->pSessionStateDb, &pCur->pCur, NULL) < 0) {
streamStateFreeCur(pCur);
return NULL;
}
SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
int32_t c = 0;
if (tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateSessionKey), &c) < 0) {
tdbTbcClose(pCur->pCur);
streamStateFreeCur(pCur);
return NULL;
}
if (c > 0) return pCur;
if (tdbTbcMoveToPrev(pCur->pCur) < 0) {
streamStateFreeCur(pCur);
return NULL;
}
return pCur;
}
SStreamStateCur* streamStateSessionSeekKeyNext(SStreamState* pState, const SSessionKey* key) {
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
if (pCur == NULL) {
return NULL;
}
pCur->number = pState->number;
if (tdbTbcOpen(pState->pSessionStateDb, &pCur->pCur, NULL) < 0) {
streamStateFreeCur(pCur);
return NULL;
}
SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
int32_t c = 0;
if (tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateSessionKey), &c) < 0) {
tdbTbcClose(pCur->pCur);
streamStateFreeCur(pCur);
return NULL;
}
if (c > 0) return pCur;
if (tdbTbcMoveToNext(pCur->pCur) < 0) {
streamStateFreeCur(pCur);
return NULL;
}
return pCur;
}
int32_t streamStateSessionGetKVByCur(SStreamStateCur* pCur, SSessionKey* pKey, const void** pVal, int32_t* pVLen) {
if (!pCur) {
return -1;
}
const SStateSessionKey* pKTmp = NULL;
int32_t kLen;
if (tdbTbcGet(pCur->pCur, (const void**)&pKTmp, &kLen, pVal, pVLen) < 0) {
return -1;
}
if (pKTmp->opNum != pCur->number) {
return -1;
}
if (pKey->groupId != 0 && pKey->groupId != pKTmp->key.groupId) {
return -1;
}
*pKey = pKTmp->key;
return 0;
}
int32_t streamStateSessionClear(SStreamState* pState) {
SSessionKey key = {.win.skey = 0, .win.ekey = 0, .groupId = 0};
streamStateSessionPut(pState, &key, NULL, 0);
SStreamStateCur* pCur = streamStateSessionSeekKeyNext(pState, &key);
while (1) {
SSessionKey delKey = {0};
void* buf = NULL;
int32_t size = 0;
int32_t code = streamStateSessionGetKVByCur(pCur, &delKey, buf, &size);
if (code == 0) {
memset(buf, 0, size);
streamStateSessionPut(pState, &delKey, buf, size);
} else {
break;
}
streamStateCurNext(pState, pCur);
}
streamStateFreeCur(pCur);
streamStateSessionDel(pState, &key);
return 0;
}
SStreamStateCur* streamStateSessionGetCur(SStreamState* pState, const SSessionKey* key) {
SStreamStateCur* pCur = streamStateSessionGetRanomCur(pState, key);
SSessionKey resKey = *key;
while (1) {
streamStateCurPrev(pState, pCur);
SSessionKey tmpKey = *key;
int32_t code = streamStateSessionGetKVByCur(pCur, &tmpKey, NULL, 0);
if (code == TSDB_CODE_SUCCESS && sessionKeyCmpr(key, &tmpKey) == 0) {
resKey = tmpKey;
} else {
break;
}
}
streamStateFreeCur(pCur);
return streamStateSessionGetRanomCur(pState, &resKey);
}
int32_t streamStateSessionAddIfNotExist(SStreamState* pState, SSessionKey* key, void** pVal, int32_t* pVLen) {
// todo refactor
SStreamStateCur* pCur = streamStateSessionGetCur(pState, key);
int32_t size = *pVLen;
void* tmp = NULL;
*pVal = tdbRealloc(NULL, size);
memset(*pVal, 0, size);
if (streamStateSessionGetKVByCur(pCur, key, (const void**)&tmp, pVLen) == 0) {
memcpy(*pVal, tmp, *pVLen);
streamStateFreeCur(pCur);
return 0;
}
streamStateFreeCur(pCur);
return 1;
}
int32_t streamStateStateAddIfNotExist(SStreamState* pState, SSessionKey* key, char* pKeyData, int32_t keyDataLen,
state_key_cmpr_fn fn, void** pVal, int32_t* pVLen) {
// todo refactor
int32_t res = TSDB_CODE_SUCCESS;
SSessionKey tmpKey = *key;
int32_t valSize = *pVLen;
void* tmp = tdbRealloc(NULL, valSize);
if (!tmp) {
return -1;
}
SStreamStateCur* pCur = streamStateSessionGetRanomCur(pState, key);
int32_t code = streamStateSessionGetKVByCur(pCur, key, (const void**)pVal, pVLen);
if (code == TSDB_CODE_SUCCESS) {
memcpy(tmp, *pVal, valSize);
*pVal = tmp;
streamStateFreeCur(pCur);
return res;
}
streamStateFreeCur(pCur);
streamStateSessionPut(pState, key, NULL, 0);
pCur = streamStateSessionGetRanomCur(pState, key);
streamStateCurPrev(pState, pCur);
code = streamStateSessionGetKVByCur(pCur, key, (const void**)pVal, pVLen);
if (code == TSDB_CODE_SUCCESS) {
void* stateKey = (char*)(*pVal) + (valSize - keyDataLen);
if (fn(pKeyData, stateKey) == true) {
memcpy(tmp, *pVal, valSize);
goto _end;
}
}
streamStateFreeCur(pCur);
*key = tmpKey;
pCur = streamStateSessionSeekKeyNext(pState, key);
code = streamStateSessionGetKVByCur(pCur, key, (const void**)pVal, pVLen);
if (code == TSDB_CODE_SUCCESS) {
void* stateKey = (char*)(*pVal) + (valSize - keyDataLen);
if (fn(pKeyData, stateKey) == true) {
memcpy(tmp, *pVal, valSize);
goto _end;
}
}
*key = tmpKey;
res = 1;
memset(tmp, 0, valSize);
_end:
*pVal = tmp;
streamStateSessionDel(pState, &tmpKey);
streamStateFreeCur(pCur);
return res;
}

View File

@ -3032,7 +3032,8 @@ static int32_t syncNodeProposeConfigChangeFinish(SSyncNode* ths, SyncReconfigFin
}
bool syncNodeIsOptimizedOneReplica(SSyncNode* ths, SRpcMsg* pMsg) {
return (ths->replicaNum == 1 && syncUtilUserCommit(pMsg->msgType) && ths->vgId != 1);
return (ths->replicaNum == 1 && syncUtilUserCommit(pMsg->msgType));
// return (ths->replicaNum == 1 && syncUtilUserCommit(pMsg->msgType) && ths->vgId != 1);
}
int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, uint64_t flag) {
@ -3084,7 +3085,8 @@ int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex,
// user commit
if ((ths->pFsm->FpCommitCb != NULL) && syncUtilUserCommit(pEntry->originalRpcType)) {
bool internalExecute = true;
if ((ths->replicaNum == 1) && ths->restoreFinish && ths->vgId != 1) {
if ((ths->replicaNum == 1) && ths->restoreFinish) {
// if ((ths->replicaNum == 1) && ths->restoreFinish && ths->vgId != 1) {
internalExecute = false;
}

View File

@ -249,12 +249,18 @@
./test.sh -f tsim/stream/windowClose.sim
./test.sh -f tsim/stream/ignoreExpiredData.sim
./test.sh -f tsim/stream/sliding.sim
#./test.sh -f tsim/stream/partitionbyColumnInterval.sim
./test.sh -f tsim/stream/partitionbyColumnInterval.sim
#./test.sh -f tsim/stream/partitionbyColumnSession.sim
#./test.sh -f tsim/stream/partitionbyColumnState.sim
#./test.sh -f tsim/stream/deleteInterval.sim
#./test.sh -f tsim/stream/deleteSession.sim
#./test.sh -f tsim/stream/deleteState.sim
#./test.sh -f tsim/stream/fillIntervalDelete0.sim
#./test.sh -f tsim/stream/fillIntervalDelete1.sim
./test.sh -f tsim/stream/fillIntervalLinear.sim
#./test.sh -f tsim/stream/fillIntervalPartitionBy.sim
./test.sh -f tsim/stream/fillIntervalPrevNext.sim
./test.sh -f tsim/stream/fillIntervalValue.sim
# ---- transaction ----
./test.sh -f tsim/trans/lossdata1.sim

View File

@ -17,9 +17,9 @@ sql create table $table1 (ts timestamp, b binary(20))
sql create table $table2 (ts timestamp, b binary(20))
sql insert into $table1 values(now, "table_name")
sql insert into $table1 values(now-1m, "tablexname")
sql insert into $table1 values(now-3m, "tablexname")
sql insert into $table1 values(now-2m, "tablexxx")
sql insert into $table1 values(now-2m, "table")
sql insert into $table1 values(now-1m, "table")
sql select b from $table1
if $rows != 4 then

View File

@ -216,12 +216,12 @@ if $data02 != 3.274823935 then
goto loop2
endi
if $data03 != 1.800000000 then
if $data03 != 1.500000000 then
print ======$data03
return -1
endi
if $data04 != 3.350000000 then
if $data04 != 3.500000000 then
print ======$data04
return -1
endi

View File

@ -5,15 +5,15 @@ sleep 50
sql connect
print =============== create database
sql create database test vgroups 1
sql select * from information_schema.ins_databases
sql create database test vgroups 1;
sql select * from information_schema.ins_databases;
if $rows != 3 then
return -1
endi
print $data00 $data01 $data02
sql use test
sql use test;
sql create table t1(ts timestamp, a int, b int , c int, d double,id int);

View File

@ -349,7 +349,7 @@ endi
if $rows != 3 then
print ====loop4=rows=$rows
# goto loop4
goto loop4
endi
# row 0

View File

@ -13,7 +13,7 @@ endi
print $data00 $data01 $data02
sql use test
sql use test;
sql create table t2(ts timestamp, a int, b int , c int, d double);
sql create stream streams2 trigger window_close into streamt2 as select _wstart, count(*) c1, count(d) c2 , sum(a) c3 , max(b) c4, min(c) c5 from t2 session(ts, 10s);
@ -58,16 +58,11 @@ endi
sql insert into t2 values(1648791233002,1,2,3,1.0);
sleep 300
sql select * from streamt2;
if $rows != 1 then
if $rows != 0 then
print ======$rows
return -1
endi
if $data01 != 6 then
print ======$data01
return -1
endi
sql insert into t2 values(1648791253003,1,2,3,1.0);
sleep 300
sql select * from streamt2;

View File

@ -102,27 +102,8 @@ int smlProcess_json1_Test() {
taos_free_result(pRes);
const char *sql[] = {
"["
" {"
" \"metric\": \"sys.cpu.nice\","
" \"timestamp\": 0,"
" \"value\": 18,"
" \"tags\": {"
" \"host\": \"web01\","
" \"id\": \"t1\","
" \"dc\": \"lga\""
" }"
" },"
" {"
" \"metric\": \"sys.cpu.nice\","
" \"timestamp\": 1662344042,"
" \"value\": 9,"
" \"tags\": {"
" \"host\": \"web02\","
" \"dc\": \"lga\""
" }"
" }"
"]",};
"[{\"metric\":\"sys.cpu.nice\",\"timestamp\":0,\"value\":18,\"tags\":{\"host\":\"web01\",\"id\":\"t1\",\"dc\":\"lga\"}},{\"metric\":\"sys.cpu.nice\",\"timestamp\":1662344042,\"value\":9,\"tags\":{\"host\":\"web02\",\"dc\":\"lga\"}}]"
};
pRes = taos_schemaless_insert(taos, (char **)sql, sizeof(sql) / sizeof(sql[0]), TSDB_SML_JSON_PROTOCOL,
TSDB_SML_TIMESTAMP_NANO_SECONDS);
printf("%s result:%s\n", __FUNCTION__, taos_errstr(pRes));
@ -143,28 +124,8 @@ int smlProcess_json2_Test() {
taos_free_result(pRes);
const char *sql[] = {
"{"
" \"metric\": \"meter_current0\","
" \"timestamp\": {"
" \"value\" : 1662344042,"
" \"type\" : \"s\""
" },"
" \"value\": {"
" \"value\" : 10.3,"
" \"type\" : \"i64\""
" },"
" \"tags\": {"
" \"groupid\": { "
" \"value\" : 2,"
" \"type\" : \"bigint\""
" },"
" \"location\": { "
" \"value\" : \"北京\","
" \"type\" : \"binary\""
" },"
" \"id\": \"d1001\""
" }"
"}",};
"{\"metric\":\"meter_current0\",\"timestamp\":{\"value\":1662344042,\"type\":\"s\"},\"value\":{\"value\":10.3,\"type\":\"i64\"},\"tags\":{\"groupid\":{\"value\":2,\"type\":\"bigint\"},\"location\":{\"value\":\"北京\",\"type\":\"binary\"},\"id\":\"d1001\"}}"
};
pRes = taos_schemaless_insert(taos, (char **)sql, sizeof(sql) / sizeof(sql[0]), TSDB_SML_JSON_PROTOCOL,
TSDB_SML_TIMESTAMP_NANO_SECONDS);
printf("%s result:%s\n", __FUNCTION__, taos_errstr(pRes));
@ -185,56 +146,7 @@ int smlProcess_json3_Test() {
taos_free_result(pRes);
const char *sql[] = {
"{"
" \"metric\": \"meter_current1\","
" \"timestamp\": {"
" \"value\" : 1662344042,"
" \"type\" : \"s\""
" },"
" \"value\": {"
" \"value\" : 10.3,"
" \"type\" : \"i64\""
" },"
" \"tags\": {"
" \"t1\": { "
" \"value\" : 2,"
" \"type\" : \"bigint\""
" },"
" \"t2\": { "
" \"value\" : 2,"
" \"type\" : \"int\""
" },"
" \"t3\": { "
" \"value\" : 2,"
" \"type\" : \"i16\""
" },"
" \"t4\": { "
" \"value\" : 2,"
" \"type\" : \"i8\""
" },"
" \"t5\": { "
" \"value\" : 2,"
" \"type\" : \"f32\""
" },"
" \"t6\": { "
" \"value\" : 2,"
" \"type\" : \"double\""
" },"
" \"t7\": { "
" \"value\" : \"8323\","
" \"type\" : \"binary\""
" },"
" \"t8\": { "
" \"value\" : \"北京\","
" \"type\" : \"nchar\""
" },"
" \"t9\": { "
" \"value\" : true,"
" \"type\" : \"bool\""
" },"
" \"id\": \"d1001\""
" }"
"}",};
"{\"metric\":\"meter_current1\",\"timestamp\":{\"value\":1662344042,\"type\":\"s\"},\"value\":{\"value\":10.3,\"type\":\"i64\"},\"tags\":{\"t1\":{\"value\":2,\"type\":\"bigint\"},\"t2\":{\"value\":2,\"type\":\"int\"},\"t3\":{\"value\":2,\"type\":\"i16\"},\"t4\":{\"value\":2,\"type\":\"i8\"},\"t5\":{\"value\":2,\"type\":\"f32\"},\"t6\":{\"value\":2,\"type\":\"double\"},\"t7\":{\"value\":\"8323\",\"type\":\"binary\"},\"t8\":{\"value\":\"北京\",\"type\":\"nchar\"},\"t9\":{\"value\":true,\"type\":\"bool\"},\"id\":\"d1001\"}}"};
pRes = taos_schemaless_insert(taos, (char **)sql, sizeof(sql) / sizeof(sql[0]), TSDB_SML_JSON_PROTOCOL,
TSDB_SML_TIMESTAMP_NANO_SECONDS);
printf("%s result:%s\n", __FUNCTION__, taos_errstr(pRes));
@ -255,47 +167,8 @@ int smlProcess_json4_Test() {
taos_free_result(pRes);
const char *sql[] = {
"{"
" \"metric\": \"meter_current2\","
" \"timestamp\": {"
" \"value\" : 1662344042000,"
" \"type\" : \"ms\""
" },"
" \"value\": \"ni\","
" \"tags\": {"
" \"t1\": { "
" \"value\" : 20,"
" \"type\" : \"i64\""
" },"
" \"t2\": { "
" \"value\" : 25,"
" \"type\" : \"i32\""
" },"
" \"t3\": { "
" \"value\" : 2,"
" \"type\" : \"smallint\""
" },"
" \"t4\": { "
" \"value\" : 2,"
" \"type\" : \"tinyint\""
" },"
" \"t5\": { "
" \"value\" : 2,"
" \"type\" : \"float\""
" },"
" \"t6\": { "
" \"value\" : 0.2,"
" \"type\" : \"f64\""
" },"
" \"t7\": \"nsj\","
" \"t8\": { "
" \"value\" : \"北京\","
" \"type\" : \"nchar\""
" },"
" \"t9\": false,"
" \"id\": \"d1001\""
" }"
"}",};
"{\"metric\":\"meter_current2\",\"timestamp\":{\"value\":1662344042000,\"type\":\"ms\"},\"value\":\"ni\",\"tags\":{\"t1\":{\"value\":20,\"type\":\"i64\"},\"t2\":{\"value\":25,\"type\":\"i32\"},\"t3\":{\"value\":2,\"type\":\"smallint\"},\"t4\":{\"value\":2,\"type\":\"tinyint\"},\"t5\":{\"value\":2,\"type\":\"float\"},\"t6\":{\"value\":0.2,\"type\":\"f64\"},\"t7\":\"nsj\",\"t8\":{\"value\":\"北京\",\"type\":\"nchar\"},\"t9\":false,\"id\":\"d1001\"}}"
};
pRes = taos_schemaless_insert(taos, (char **)sql, sizeof(sql) / sizeof(sql[0]), TSDB_SML_JSON_PROTOCOL,
TSDB_SML_TIMESTAMP_NANO_SECONDS);
printf("%s result:%s\n", __FUNCTION__, taos_errstr(pRes));

View File

@ -130,15 +130,15 @@ void parseArgument(int32_t argc, char* argv[]) {
printHelp();
exit(0);
} else if (strcmp(argv[i], "-d") == 0) {
strcpy(g_stConfInfo.dbName, argv[++i]);
tstrncpy(g_stConfInfo.dbName, argv[++i], sizeof(g_stConfInfo.dbName));
} else if (strcmp(argv[i], "-c") == 0) {
strcpy(configDir, argv[++i]);
tstrncpy(configDir, argv[++i], PATH_MAX);
} else if (strcmp(argv[i], "-s") == 0) {
strcpy(g_stConfInfo.stbName, argv[++i]);
tstrncpy(g_stConfInfo.stbName, argv[++i], sizeof(g_stConfInfo.stbName));
} else if (strcmp(argv[i], "-w") == 0) {
strcpy(g_stConfInfo.vnodeWalPath, argv[++i]);
tstrncpy(g_stConfInfo.vnodeWalPath, argv[++i], sizeof(g_stConfInfo.vnodeWalPath));
} else if (strcmp(argv[i], "-f") == 0) {
strcpy(g_stConfInfo.resultFileName, argv[++i]);
tstrncpy(g_stConfInfo.resultFileName, argv[++i], sizeof(g_stConfInfo.resultFileName));
} else if (strcmp(argv[i], "-t") == 0) {
g_stConfInfo.numOfThreads = atoi(argv[++i]);
} else if (strcmp(argv[i], "-n") == 0) {

View File

@ -949,7 +949,7 @@ void parseConsumeInfo() {
token = strtok(g_stConfInfo.stThreads[i].topicString, delim);
while (token != NULL) {
// printf("%s\n", token );
strcpy(g_stConfInfo.stThreads[i].topics[g_stConfInfo.stThreads[i].numOfTopic], token);
tstrncpy(g_stConfInfo.stThreads[i].topics[g_stConfInfo.stThreads[i].numOfTopic], token, sizeof(g_stConfInfo.stThreads[i].topics[g_stConfInfo.stThreads[i].numOfTopic]));
ltrim(g_stConfInfo.stThreads[i].topics[g_stConfInfo.stThreads[i].numOfTopic]);
// printf("%s\n", g_stConfInfo.topics[g_stConfInfo.numOfTopic]);
g_stConfInfo.stThreads[i].numOfTopic++;

View File

@ -37,6 +37,7 @@ void simLogSql(char *sql, bool useSharp) {
taosFsyncFile(pFile);
}
#if 0
char *simParseArbitratorName(char *varName) {
static char hostName[140];
#ifdef WINDOWS
@ -47,6 +48,7 @@ char *simParseArbitratorName(char *varName) {
#endif
return hostName;
}
#endif
char *simParseHostName(char *varName) {
static char hostName[140];
@ -102,9 +104,11 @@ char *simGetVariable(SScript *script, char *varName, int32_t varLen) {
return simParseHostName(varName);
}
#if 0
if (strncmp(varName, "arbitrator", 10) == 0) {
return simParseArbitratorName(varName);
}
#endif
if (strncmp(varName, "error", varLen) == 0) return script->error;
@ -883,6 +887,7 @@ bool simExecuteSqlSlowCmd(SScript *script, char *rest) {
return simExecuteSqlImpCmd(script, rest, isSlow);
}
#if 0
bool simExecuteRestfulCmd(SScript *script, char *rest) {
TdFilePtr pFile = NULL;
char filename[256];
@ -924,6 +929,7 @@ bool simExecuteRestfulCmd(SScript *script, char *rest) {
return simExecuteSystemCmd(script, cmd);
}
#endif
bool simExecuteSqlErrorCmd(SScript *script, char *rest) {
char buf[3000];
@ -981,6 +987,7 @@ bool simExecuteSqlErrorCmd(SScript *script, char *rest) {
return false;
}
#if 0
bool simExecuteLineInsertCmd(SScript *script, char *rest) {
char buf[TSDB_MAX_BINARY_LEN] = {0};
@ -1037,3 +1044,4 @@ bool simExecuteLineInsertErrorCmd(SScript *script, char *rest) {
return true;
}
}
#endif

View File

@ -501,6 +501,7 @@ bool simParseEndwCmd(char *rest, SCommand *pCmd, int32_t lineNum) {
return true;
}
#if 0
bool simParseSwitchCmd(char *rest, SCommand *pCmd, int32_t lineNum) {
char *token;
int32_t tokenLen;
@ -647,6 +648,7 @@ bool simParseContinueCmd(char *rest, SCommand *pCmd, int32_t lineNum) {
numOfLines++;
return true;
}
#endif
bool simParsePrintCmd(char *rest, SCommand *pCmd, int32_t lineNum) {
int32_t expLen;
@ -715,6 +717,7 @@ bool simParseSqlErrorCmd(char *rest, SCommand *pCmd, int32_t lineNum) {
return true;
}
#if 0
bool simParseSqlSlowCmd(char *rest, SCommand *pCmd, int32_t lineNum) {
simParseSqlCmd(rest, pCmd, lineNum);
cmdLine[numOfLines - 1].cmdno = SIM_CMD_SQL_SLOW;
@ -726,6 +729,7 @@ bool simParseRestfulCmd(char *rest, SCommand *pCmd, int32_t lineNum) {
cmdLine[numOfLines - 1].cmdno = SIM_CMD_RESTFUL;
return true;
}
#endif
bool simParseSystemCmd(char *rest, SCommand *pCmd, int32_t lineNum) {
int32_t expLen;
@ -838,6 +842,7 @@ bool simParseRunBackCmd(char *rest, SCommand *pCmd, int32_t lineNum) {
return true;
}
#if 0
bool simParseLineInsertCmd(char *rest, SCommand *pCmd, int32_t lineNum) {
int32_t expLen;
@ -869,6 +874,7 @@ bool simParseLineInsertErrorCmd(char *rest, SCommand *pCmd, int32_t lineNum) {
numOfLines++;
return true;
}
#endif
void simInitsimCmdList() {
int32_t cmdno;
@ -930,6 +936,7 @@ void simInitsimCmdList() {
simCmdList[cmdno].executeCmd = NULL;
simAddCmdIntoHash(&(simCmdList[cmdno]));
#if 0
cmdno = SIM_CMD_SWITCH;
simCmdList[cmdno].cmdno = cmdno;
strcpy(simCmdList[cmdno].name, "switch");
@ -977,6 +984,7 @@ void simInitsimCmdList() {
simCmdList[cmdno].parseCmd = simParseEndsCmd;
simCmdList[cmdno].executeCmd = NULL;
simAddCmdIntoHash(&(simCmdList[cmdno]));
#endif
cmdno = SIM_CMD_SLEEP;
simCmdList[cmdno].cmdno = cmdno;
@ -1050,6 +1058,7 @@ void simInitsimCmdList() {
simCmdList[cmdno].executeCmd = simExecuteSqlErrorCmd;
simAddCmdIntoHash(&(simCmdList[cmdno]));
#if 0
cmdno = SIM_CMD_SQL_SLOW;
simCmdList[cmdno].cmdno = cmdno;
strcpy(simCmdList[cmdno].name, "sql_slow");
@ -1065,6 +1074,7 @@ void simInitsimCmdList() {
simCmdList[cmdno].parseCmd = simParseRestfulCmd;
simCmdList[cmdno].executeCmd = simExecuteRestfulCmd;
simAddCmdIntoHash(&(simCmdList[cmdno]));
#endif
/* test is only an internal command */
cmdno = SIM_CMD_TEST;
@ -1082,6 +1092,7 @@ void simInitsimCmdList() {
simCmdList[cmdno].executeCmd = simExecuteReturnCmd;
simAddCmdIntoHash(&(simCmdList[cmdno]));
#if 0
cmdno = SIM_CMD_LINE_INSERT;
simCmdList[cmdno].cmdno = cmdno;
strcpy(simCmdList[cmdno].name, "line_insert");
@ -1097,4 +1108,5 @@ void simInitsimCmdList() {
simCmdList[cmdno].parseCmd = simParseLineInsertErrorCmd;
simCmdList[cmdno].executeCmd = simExecuteLineInsertErrorCmd;
simAddCmdIntoHash(&(simCmdList[cmdno]));
#endif
}