Merge remote-tracking branch 'origin/3.0' into fix/valgrind

This commit is contained in:
Shengliang Guan 2022-07-09 20:13:26 +08:00
commit c53a84b733
39 changed files with 779 additions and 536 deletions

View File

@ -55,7 +55,8 @@ enum {
enum { enum {
STREAM_INPUT__DATA_SUBMIT = 1, STREAM_INPUT__DATA_SUBMIT = 1,
STREAM_INPUT__DATA_BLOCK, STREAM_INPUT__DATA_BLOCK,
STREAM_INPUT__DATA_SCAN, STREAM_INPUT__TABLE_SCAN,
STREAM_INPUT__TQ_SCAN,
STREAM_INPUT__DATA_RETRIEVE, STREAM_INPUT__DATA_RETRIEVE,
STREAM_INPUT__TRIGGER, STREAM_INPUT__TRIGGER,
STREAM_INPUT__CHECKPOINT, STREAM_INPUT__CHECKPOINT,
@ -123,6 +124,7 @@ enum {
typedef struct { typedef struct {
int8_t fetchType; int8_t fetchType;
STqOffsetVal offset;
union { union {
SSDataBlock data; SSDataBlock data;
void* meta; void* meta;

View File

@ -231,7 +231,7 @@ SSDataBlock* createDataBlock();
int32_t blockDataAppendColInfo(SSDataBlock* pBlock, SColumnInfoData* pColInfoData); int32_t blockDataAppendColInfo(SSDataBlock* pBlock, SColumnInfoData* pColInfoData);
SColumnInfoData createColumnInfoData(int16_t type, int32_t bytes, int16_t colId); SColumnInfoData createColumnInfoData(int16_t type, int32_t bytes, int16_t colId);
SColumnInfoData* bdGetColumnInfoData(SSDataBlock* pBlock, int32_t index); SColumnInfoData* bdGetColumnInfoData(const SSDataBlock* pBlock, int32_t index);
void blockEncode(const SSDataBlock* pBlock, char* data, int32_t* dataLen, int32_t numOfCols, int8_t needCompress); void blockEncode(const SSDataBlock* pBlock, char* data, int32_t* dataLen, int32_t numOfCols, int8_t needCompress);
const char* blockDecode(SSDataBlock* pBlock, int32_t numOfCols, int32_t numOfRows, const char* pData); const char* blockDecode(SSDataBlock* pBlock, int32_t numOfCols, int32_t numOfRows, const char* pData);

View File

@ -174,7 +174,13 @@ int32_t qDeserializeTaskStatus(qTaskInfo_t tinfo, const char* pInput, int32_t le
*/ */
int32_t qGetStreamScanStatus(qTaskInfo_t tinfo, uint64_t* uid, int64_t* ts); int32_t qGetStreamScanStatus(qTaskInfo_t tinfo, uint64_t* uid, int64_t* ts);
int32_t qStreamPrepareScan(qTaskInfo_t tinfo, uint64_t uid, int64_t ts); int32_t qStreamPrepareTsdbScan(qTaskInfo_t tinfo, uint64_t uid, int64_t ts);
int32_t qStreamPrepareScan1(qTaskInfo_t tinfo, const STqOffsetVal* pOffset);
int32_t qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset);
void* qStreamExtractMetaMsg(qTaskInfo_t tinfo);
void* qExtractReaderFromStreamScanner(void* scanner); void* qExtractReaderFromStreamScanner(void* scanner);
int32_t qExtractStreamScanner(qTaskInfo_t tinfo, void** scanner); int32_t qExtractStreamScanner(qTaskInfo_t tinfo, void** scanner);

View File

@ -172,7 +172,13 @@ typedef struct tExprNode {
void tExprTreeDestroy(tExprNode *pNode, void (*fp)(void *)); void tExprTreeDestroy(tExprNode *pNode, void (*fp)(void *));
typedef enum {
SHOULD_FREE_COLDATA = 0x1, // the newly created column data needs to be destroyed.
DELEGATED_MGMT_COLDATA = 0x2, // input column data should not be released.
} ECOLDATA_MGMT_TYPE_E;
struct SScalarParam { struct SScalarParam {
ECOLDATA_MGMT_TYPE_E type;
SColumnInfoData *columnData; SColumnInfoData *columnData;
SHashObj *pHashFilter; SHashObj *pHashFilter;
int32_t hashValueType; int32_t hashValueType;

View File

@ -166,6 +166,7 @@ int32_t walRestoreFromSnapshot(SWal *, int64_t ver);
SWalReader *walOpenReader(SWal *, SWalFilterCond *pCond); SWalReader *walOpenReader(SWal *, SWalFilterCond *pCond);
void walCloseReader(SWalReader *pRead); void walCloseReader(SWalReader *pRead);
int32_t walReadVer(SWalReader *pRead, int64_t ver); int32_t walReadVer(SWalReader *pRead, int64_t ver);
int32_t walReadSeekVer(SWalReader *pRead, int64_t ver);
int32_t walNextValidMsg(SWalReader *pRead); int32_t walNextValidMsg(SWalReader *pRead);
// only for tq usage // only for tq usage

View File

@ -1356,7 +1356,7 @@ SColumnInfoData createColumnInfoData(int16_t type, int32_t bytes, int16_t colId)
return col; return col;
} }
SColumnInfoData* bdGetColumnInfoData(SSDataBlock* pBlock, int32_t index) { SColumnInfoData* bdGetColumnInfoData(const SSDataBlock* pBlock, int32_t index) {
ASSERT(pBlock != NULL); ASSERT(pBlock != NULL);
if (index >= taosArrayGetSize(pBlock->pDataBlock)) { if (index >= taosArrayGetSize(pBlock->pDataBlock)) {
return NULL; return NULL;

View File

@ -546,7 +546,11 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) {
char cgroup[TSDB_CGROUP_LEN]; char cgroup[TSDB_CGROUP_LEN];
mndSplitSubscribeKey(pRebInfo->key, topic, cgroup, true); mndSplitSubscribeKey(pRebInfo->key, topic, cgroup, true);
SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic); SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic);
ASSERT(pTopic); /*ASSERT(pTopic);*/
if (pTopic == NULL) {
mError("rebalance %s failed since topic %s was dropped, abort", pRebInfo->key, topic);
continue;
}
taosRLockLatch(&pTopic->lock); taosRLockLatch(&pTopic->lock);
rebOutput.pSub = mndCreateSub(pMnode, pTopic, pRebInfo->key); rebOutput.pSub = mndCreateSub(pMnode, pTopic, pRebInfo->key);

View File

@ -26,7 +26,6 @@ target_sources(
"src/meta/metaSnapshot.c" "src/meta/metaSnapshot.c"
# sma # sma
"src/sma/sma.c"
"src/sma/smaEnv.c" "src/sma/smaEnv.c"
"src/sma/smaUtil.c" "src/sma/smaUtil.c"
"src/sma/smaOpen.c" "src/sma/smaOpen.c"

View File

@ -174,6 +174,9 @@ int32_t tqReaderSetTbUidList(STqReader *pReader, const SArray *tbUidList);
int32_t tqReaderAddTbUidList(STqReader *pReader, const SArray *tbUidList); int32_t tqReaderAddTbUidList(STqReader *pReader, const SArray *tbUidList);
int32_t tqReaderRemoveTbUidList(STqReader *pReader, const SArray *tbUidList); int32_t tqReaderRemoveTbUidList(STqReader *pReader, const SArray *tbUidList);
int32_t tqSeekVer(STqReader *pReader, int64_t ver);
int32_t tqNextBlock(STqReader *pReader, SFetchRet *ret);
int32_t tqReaderSetDataMsg(STqReader *pReader, SSubmitReq *pMsg, int64_t ver); int32_t tqReaderSetDataMsg(STqReader *pReader, SSubmitReq *pMsg, int64_t ver);
bool tqNextDataBlock(STqReader *pReader); bool tqNextDataBlock(STqReader *pReader);
bool tqNextDataBlockFilterOut(STqReader *pReader, SHashObj *filterOutUids); bool tqNextDataBlockFilterOut(STqReader *pReader, SHashObj *filterOutUids);

View File

@ -47,7 +47,8 @@ struct SSmaEnv {
}; };
typedef struct { typedef struct {
int32_t smaRef; int8_t inited;
int32_t rsetId;
} SSmaMgmt; } SSmaMgmt;
#define SMA_ENV_LOCK(env) ((env)->lock) #define SMA_ENV_LOCK(env) ((env)->lock)
@ -95,6 +96,7 @@ enum {
TASK_TRIGGER_STAT_CANCELLED = 4, TASK_TRIGGER_STAT_CANCELLED = 4,
TASK_TRIGGER_STAT_FINISHED = 5, TASK_TRIGGER_STAT_FINISHED = 5,
}; };
void tdDestroySmaEnv(SSmaEnv *pSmaEnv); void tdDestroySmaEnv(SSmaEnv *pSmaEnv);
void *tdFreeSmaEnv(SSmaEnv *pSmaEnv); void *tdFreeSmaEnv(SSmaEnv *pSmaEnv);
@ -104,6 +106,10 @@ int32_t tdInsertRSmaData(SSma *pSma, char *msg);
int32_t tdRefSmaStat(SSma *pSma, SSmaStat *pStat); int32_t tdRefSmaStat(SSma *pSma, SSmaStat *pStat);
int32_t tdUnRefSmaStat(SSma *pSma, SSmaStat *pStat); int32_t tdUnRefSmaStat(SSma *pSma, SSmaStat *pStat);
void *tdAcquireSmaRef(int32_t rsetId, int64_t refId, const char *tags, int32_t ln);
int32_t tdReleaseSmaRef(int32_t rsetId, int64_t refId, const char *tags, int32_t ln);
int32_t tdCheckAndInitSmaEnv(SSma *pSma, int8_t smaType); int32_t tdCheckAndInitSmaEnv(SSma *pSma, int8_t smaType);
int32_t tdLockSma(SSma *pSma); int32_t tdLockSma(SSma *pSma);

View File

@ -129,6 +129,7 @@ typedef struct {
static STqMgmt tqMgmt = {0}; static STqMgmt tqMgmt = {0};
// tqRead // tqRead
int64_t tqScanLog(STQ* pTq, const STqExecHandle* pExec, SMqDataRsp* pRsp, STqOffsetVal* offset);
int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHead** pHeadWithCkSum); int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHead** pHeadWithCkSum);
// tqExec // tqExec

View File

@ -163,6 +163,8 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pSchema, bool
const char* stbFullName, int32_t vgId); const char* stbFullName, int32_t vgId);
// sma // sma
int32_t smaInit();
void smaCleanUp();
int32_t smaOpen(SVnode* pVnode); int32_t smaOpen(SVnode* pVnode);
int32_t smaClose(SSma* pSma); int32_t smaClose(SSma* pSma);
int32_t smaBegin(SSma* pSma); int32_t smaBegin(SSma* pSma);

View File

@ -1,257 +0,0 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "sma.h"
// functions for external invocation
// TODO: Who is responsible for resource allocate and release?
int32_t tdProcessTSmaInsert(SSma* pSma, int64_t indexUid, const char* msg) {
int32_t code = TSDB_CODE_SUCCESS;
if ((code = tdProcessTSmaInsertImpl(pSma, indexUid, msg)) < 0) {
smaWarn("vgId:%d, insert tsma data failed since %s", SMA_VID(pSma), tstrerror(terrno));
}
// TODO: destroy SSDataBlocks(msg)
return code;
}
int32_t tdProcessTSmaCreate(SSma* pSma, int64_t version, const char* msg) {
int32_t code = TSDB_CODE_SUCCESS;
if ((code = tdProcessTSmaCreateImpl(pSma, version, msg)) < 0) {
smaWarn("vgId:%d, create tsma failed since %s", SMA_VID(pSma), tstrerror(terrno));
}
// TODO: destroy SSDataBlocks(msg)
return code;
}
int32_t smaGetTSmaDays(SVnodeCfg* pCfg, void* pCont, uint32_t contLen, int32_t* days) {
int32_t code = TSDB_CODE_SUCCESS;
if ((code = tdProcessTSmaGetDaysImpl(pCfg, pCont, contLen, days)) < 0) {
smaWarn("vgId:%d, get tsma days failed since %s", pCfg->vgId, tstrerror(terrno));
}
smaDebug("vgId:%d, get tsma days %d", pCfg->vgId, *days);
return code;
}
// functions for internal invocation
#if 0
/**
* @brief TODO: Assume that the final generated result it less than 3M
*
* @param pReq
* @param pDataBlocks
* @param vgId
* @param suid // TODO: check with Liao whether suid response is reasonable
*
* TODO: colId should be set
*/
int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks, STSchema* pTSchema, int32_t vgId,
tb_uid_t suid, const char* stbName, bool isCreateCtb) {
int32_t sz = taosArrayGetSize(pDataBlocks);
int32_t bufSize = sizeof(SSubmitReq);
for (int32_t i = 0; i < sz; ++i) {
SDataBlockInfo* pBlkInfo = &((SSDataBlock*)taosArrayGet(pDataBlocks, i))->info;
bufSize += pBlkInfo->rows * (TD_ROW_HEAD_LEN + pBlkInfo->rowSize + BitmapLen(pBlkInfo->numOfCols));
bufSize += sizeof(SSubmitBlk);
}
*pReq = taosMemoryCalloc(1, bufSize);
if (!(*pReq)) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return TSDB_CODE_FAILED;
}
void* pDataBuf = *pReq;
SArray* pTagArray = NULL;
int32_t msgLen = sizeof(SSubmitReq);
int32_t numOfBlks = 0;
int32_t schemaLen = 0;
SRowBuilder rb = {0};
tdSRowInit(&rb, pTSchema->version);
for (int32_t i = 0; i < sz; ++i) {
SSDataBlock* pDataBlock = taosArrayGet(pDataBlocks, i);
SDataBlockInfo* pDataBlkInfo = &pDataBlock->info;
int32_t colNum = pDataBlkInfo->numOfCols;
int32_t rows = pDataBlkInfo->rows;
int32_t rowSize = pDataBlkInfo->rowSize;
int64_t groupId = pDataBlkInfo->groupId;
if (rb.nCols != colNum) {
tdSRowSetTpInfo(&rb, colNum, pTSchema->flen);
}
if(isCreateCtb) {
SMetaReader mr = {0};
const char* ctbName = buildCtbNameByGroupId(stbName, pDataBlock->info.groupId);
if (metaGetTableEntryByName(&mr, ctbName) != 0) {
smaDebug("vgId:%d, no tsma ctb %s exists", vgId, ctbName);
}
SVCreateTbReq ctbReq = {0};
ctbReq.name = ctbName;
ctbReq.type = TSDB_CHILD_TABLE;
ctbReq.ctb.suid = suid;
STagVal tagVal = {.cid = colNum + PRIMARYKEY_TIMESTAMP_COL_ID,
.type = TSDB_DATA_TYPE_BIGINT,
.i64 = groupId};
STag* pTag = NULL;
if(!pTagArray) {
pTagArray = taosArrayInit(1, sizeof(STagVal));
if (!pTagArray) goto _err;
}
taosArrayClear(pTagArray);
taosArrayPush(pTagArray, &tagVal);
tTagNew(pTagArray, 1, false, &pTag);
if (pTag == NULL) {
tdDestroySVCreateTbReq(&ctbReq);
goto _err;
}
ctbReq.ctb.pTag = (uint8_t*)pTag;
int32_t code;
tEncodeSize(tEncodeSVCreateTbReq, &ctbReq, schemaLen, code);
tdDestroySVCreateTbReq(&ctbReq);
if (code < 0) {
goto _err;
}
}
SSubmitBlk* pSubmitBlk = POINTER_SHIFT(pDataBuf, msgLen);
pSubmitBlk->suid = suid;
pSubmitBlk->uid = groupId;
pSubmitBlk->numOfRows = rows;
msgLen += sizeof(SSubmitBlk);
int32_t dataLen = 0;
for (int32_t j = 0; j < rows; ++j) { // iterate by row
tdSRowResetBuf(&rb, POINTER_SHIFT(pDataBuf, msgLen)); // set row buf
bool isStartKey = false;
int32_t offset = 0;
for (int32_t k = 0; k < colNum; ++k) { // iterate by column
SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, k);
STColumn* pCol = &pTSchema->columns[k];
void* var = POINTER_SHIFT(pColInfoData->pData, j * pColInfoData->info.bytes);
switch (pColInfoData->info.type) {
case TSDB_DATA_TYPE_TIMESTAMP:
if (!isStartKey) {
isStartKey = true;
tdAppendColValToRow(&rb, PRIMARYKEY_TIMESTAMP_COL_ID, TSDB_DATA_TYPE_TIMESTAMP, TD_VTYPE_NORM, var, true,
offset, k);
} else {
tdAppendColValToRow(&rb, PRIMARYKEY_TIMESTAMP_COL_ID + k, TSDB_DATA_TYPE_TIMESTAMP, TD_VTYPE_NORM, var,
true, offset, k);
}
break;
case TSDB_DATA_TYPE_NCHAR: {
tdAppendColValToRow(&rb, PRIMARYKEY_TIMESTAMP_COL_ID + k, TSDB_DATA_TYPE_NCHAR, TD_VTYPE_NORM, var, true,
offset, k);
break;
}
case TSDB_DATA_TYPE_VARCHAR: { // TSDB_DATA_TYPE_BINARY
tdAppendColValToRow(&rb, PRIMARYKEY_TIMESTAMP_COL_ID + k, TSDB_DATA_TYPE_VARCHAR, TD_VTYPE_NORM, var, true,
offset, k);
break;
}
case TSDB_DATA_TYPE_VARBINARY:
case TSDB_DATA_TYPE_DECIMAL:
case TSDB_DATA_TYPE_BLOB:
case TSDB_DATA_TYPE_JSON:
case TSDB_DATA_TYPE_MEDIUMBLOB:
uError("the column type %" PRIi16 " is defined but not implemented yet", pColInfoData->info.type);
TASSERT(0);
break;
default:
if (pColInfoData->info.type < TSDB_DATA_TYPE_MAX && pColInfoData->info.type > TSDB_DATA_TYPE_NULL) {
if (pCol->type == pColInfoData->info.type) {
tdAppendColValToRow(&rb, PRIMARYKEY_TIMESTAMP_COL_ID + k, pCol->type, TD_VTYPE_NORM, var, true, offset,
k);
} else {
char tv[8] = {0};
if (pColInfoData->info.type == TSDB_DATA_TYPE_FLOAT) {
float v = 0;
GET_TYPED_DATA(v, float, pColInfoData->info.type, var);
SET_TYPED_DATA(&tv, pCol->type, v);
} else if (pColInfoData->info.type == TSDB_DATA_TYPE_DOUBLE) {
double v = 0;
GET_TYPED_DATA(v, double, pColInfoData->info.type, var);
SET_TYPED_DATA(&tv, pCol->type, v);
} else if (IS_SIGNED_NUMERIC_TYPE(pColInfoData->info.type)) {
int64_t v = 0;
GET_TYPED_DATA(v, int64_t, pColInfoData->info.type, var);
SET_TYPED_DATA(&tv, pCol->type, v);
} else {
uint64_t v = 0;
GET_TYPED_DATA(v, uint64_t, pColInfoData->info.type, var);
SET_TYPED_DATA(&tv, pCol->type, v);
}
tdAppendColValToRow(&rb, PRIMARYKEY_TIMESTAMP_COL_ID + k, pCol->type, TD_VTYPE_NORM, tv, true, offset,
k);
}
} else {
uError("the column type %" PRIi16 " is undefined\n", pColInfoData->info.type);
TASSERT(0);
}
break;
}
offset += TYPE_BYTES[pCol->type]; // sum/avg would convert to int64_t/uint64_t/double during aggregation
}
dataLen += TD_ROW_LEN(rb.pBuf);
#ifdef TD_DEBUG_PRINT_ROW
tdSRowPrint(rb.pBuf, pTSchema, __func__);
#endif
}
++numOfBlks;
pSubmitBlk->dataLen = dataLen;
msgLen += pSubmitBlk->dataLen;
}
(*pReq)->length = msgLen;
(*pReq)->header.vgId = htonl(vgId);
(*pReq)->header.contLen = htonl(msgLen);
(*pReq)->length = (*pReq)->header.contLen;
(*pReq)->numOfBlocks = htonl(numOfBlks);
SSubmitBlk* blk = (SSubmitBlk*)((*pReq) + 1);
while (numOfBlks--) {
int32_t dataLen = blk->dataLen;
blk->uid = htobe64(blk->uid);
blk->suid = htobe64(blk->suid);
blk->padding = htonl(blk->padding);
blk->sversion = htonl(blk->sversion);
blk->dataLen = htonl(blk->dataLen);
blk->schemaLen = htonl(blk->schemaLen);
blk->numOfRows = htons(blk->numOfRows);
blk = (SSubmitBlk*)(blk->data + dataLen);
}
return TSDB_CODE_SUCCESS;
_err:
taosMemoryFreeClear(*pReq);
taosArrayDestroy(pTagArray);
return TSDB_CODE_FAILED;
}
#endif

View File

@ -121,7 +121,7 @@ static int32_t tdProcessRSmaPreCommitImpl(SSma *pSma) {
// step 3: perform persist task for qTaskInfo // step 3: perform persist task for qTaskInfo
tdRSmaPersistExecImpl(pRSmaStat); tdRSmaPersistExecImpl(pRSmaStat);
smaDebug("vgId:%d, rsma pre commit succeess", SMA_VID(pSma)); smaDebug("vgId:%d, rsma pre commit success", SMA_VID(pSma));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -173,6 +173,7 @@ static int32_t tdProcessRSmaPostCommitImpl(SSma *pSma) {
} }
if ((pDir = taosOpenDir(dir)) == NULL) { if ((pDir = taosOpenDir(dir)) == NULL) {
regfree(&regex);
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
smaWarn("vgId:%d, rsma post commit, open dir %s failed since %s", TD_VID(pVnode), dir, terrstr()); smaWarn("vgId:%d, rsma post commit, open dir %s failed since %s", TD_VID(pVnode), dir, terrstr());
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;

View File

@ -18,7 +18,7 @@
typedef struct SSmaStat SSmaStat; typedef struct SSmaStat SSmaStat;
#define RSMA_TASK_INFO_HASH_SLOT 8 #define RSMA_TASK_INFO_HASH_SLOT 8
#define SMA_MGMT_REF_NUM 1024 #define SMA_MGMT_REF_NUM 10240
extern SSmaMgmt smaMgmt; extern SSmaMgmt smaMgmt;
@ -30,7 +30,62 @@ static int32_t tdInitSmaEnv(SSma *pSma, int8_t smaType, const char *path, SSmaE
static void *tdFreeTSmaStat(STSmaStat *pStat); static void *tdFreeTSmaStat(STSmaStat *pStat);
static void tdDestroyRSmaStat(void *pRSmaStat); static void tdDestroyRSmaStat(void *pRSmaStat);
/**
* @brief rsma init
*
* @return int32_t
*/
// implementation // implementation
int32_t smaInit() {
int8_t old;
int32_t nLoops = 0;
while (1) {
old = atomic_val_compare_exchange_8(&smaMgmt.inited, 0, 2);
if (old != 2) break;
if (++nLoops > 1000) {
sched_yield();
nLoops = 0;
}
}
if (old == 0) {
smaMgmt.rsetId = taosOpenRef(SMA_MGMT_REF_NUM, tdDestroyRSmaStat);
if (smaMgmt.rsetId < 0) {
smaError("failed to init sma rset since %s", terrstr());
atomic_store_8(&smaMgmt.inited, 0);
return TSDB_CODE_FAILED;
}
smaInfo("sma rset is initialized, rsetId:%d", smaMgmt.rsetId);
atomic_store_8(&smaMgmt.inited, 1);
}
return TSDB_CODE_SUCCESS;
}
/**
* @brief rsma cleanup
*
*/
void smaCleanUp() {
int8_t old;
int32_t nLoops = 0;
while (1) {
old = atomic_val_compare_exchange_8(&smaMgmt.inited, 1, 2);
if (old != 2) break;
if (++nLoops > 1000) {
sched_yield();
nLoops = 0;
}
}
if (old == 1) {
smaInfo("sma rset is cleaned up, resetId:%d", smaMgmt.rsetId);
taosCloseRef(smaMgmt.rsetId);
atomic_store_8(&smaMgmt.inited, 0);
}
}
static SSmaEnv *tdNewSmaEnv(const SSma *pSma, int8_t smaType, const char *path) { static SSmaEnv *tdNewSmaEnv(const SSma *pSma, int8_t smaType, const char *path) {
SSmaEnv *pEnv = NULL; SSmaEnv *pEnv = NULL;
@ -135,17 +190,16 @@ static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType, const SSma *pS
atomic_store_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_INIT); atomic_store_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_INIT);
// init smaMgmt // init smaMgmt
smaMgmt.smaRef = taosOpenRef(SMA_MGMT_REF_NUM, tdDestroyRSmaStat); smaInit();
if (smaMgmt.smaRef < 0) {
smaError("init smaRef failed, num:%d", SMA_MGMT_REF_NUM);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return TSDB_CODE_FAILED;
}
int64_t refId = taosAddRef(smaMgmt.smaRef, pRSmaStat); int64_t refId = taosAddRef(smaMgmt.rsetId, pRSmaStat);
if (refId < 0) { if (refId < 0) {
smaError("taosAddRef smaRef failed, since:%s", tstrerror(terrno)); smaError("vgId:%d, taosAddRef refId:%" PRIi64 " to rsetId rsetId:%d max:%d failed since:%s", SMA_VID(pSma),
refId, smaMgmt.rsetId, SMA_MGMT_REF_NUM, tstrerror(terrno));
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} else {
smaDebug("vgId:%d, taosAddRef refId:%" PRIi64 " to rsetId rsetId:%d max:%d succeed", SMA_VID(pSma), refId,
smaMgmt.rsetId, SMA_MGMT_REF_NUM);
} }
pRSmaStat->refId = refId; pRSmaStat->refId = refId;
@ -275,8 +329,13 @@ int32_t tdDestroySmaState(SSmaStat *pSmaStat, int8_t smaType) {
tdDestroyTSmaStat(SMA_TSMA_STAT(pSmaStat)); tdDestroyTSmaStat(SMA_TSMA_STAT(pSmaStat));
} else if (smaType == TSDB_SMA_TYPE_ROLLUP) { } else if (smaType == TSDB_SMA_TYPE_ROLLUP) {
SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pSmaStat); SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pSmaStat);
if (taosRemoveRef(smaMgmt.smaRef, RSMA_REF_ID(pRSmaStat)) < 0) { if (taosRemoveRef(smaMgmt.rsetId, RSMA_REF_ID(pRSmaStat)) < 0) {
smaError("remove refId from rsmaRef:0x%" PRIx64 " failed since %s", RSMA_REF_ID(pRSmaStat), terrstr()); smaError("vgId:%d, remove refId:%" PRIi64 " from rsmaRef:%" PRIi32 " failed since %s", SMA_VID(pRSmaStat->pSma),
RSMA_REF_ID(pRSmaStat), smaMgmt.rsetId, terrstr());
ASSERT(0);
} else {
smaDebug("vgId:%d, remove refId:%" PRIi64 " from rsmaRef:%" PRIi32 " succeed", SMA_VID(pRSmaStat->pSma),
RSMA_REF_ID(pRSmaStat), smaMgmt.rsetId);
} }
} else { } else {
ASSERT(0); ASSERT(0);
@ -323,7 +382,7 @@ int32_t tdCheckAndInitSmaEnv(SSma *pSma, int8_t smaType) {
} }
break; break;
default: default:
TASSERT(0); smaError("vgId:%d undefined smaType:%", SMA_VID(pSma), smaType);
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }

View File

@ -19,7 +19,8 @@
#define RSMA_QTASKINFO_HEAD_LEN (sizeof(int32_t) + sizeof(int8_t) + sizeof(int64_t)) // len + type + suid #define RSMA_QTASKINFO_HEAD_LEN (sizeof(int32_t) + sizeof(int8_t) + sizeof(int64_t)) // len + type + suid
SSmaMgmt smaMgmt = { SSmaMgmt smaMgmt = {
.smaRef = -1, .inited = 0,
.rsetId = -1,
}; };
#define TD_QTASKINFO_FNAME_PREFIX "qtaskinfo.ver" #define TD_QTASKINFO_FNAME_PREFIX "qtaskinfo.ver"
@ -608,9 +609,11 @@ _err:
static void tdRSmaFetchTrigger(void *param, void *tmrId) { static void tdRSmaFetchTrigger(void *param, void *tmrId) {
SRSmaInfoItem *pItem = param; SRSmaInfoItem *pItem = param;
SSma *pSma = NULL; SSma *pSma = NULL;
SRSmaStat *pStat = (SRSmaStat *)taosAcquireRef(smaMgmt.smaRef, pItem->refId); SRSmaStat *pStat = (SRSmaStat *)tdAcquireSmaRef(smaMgmt.rsetId, pItem->refId, __func__, __LINE__);
if (!pStat) { if (!pStat) {
smaDebug("rsma fetch task not start since already destroyed"); smaDebug("rsma fetch task not start since already destroyed, rsetId rsetId:%" PRIi64 " refId:%d)", smaMgmt.rsetId,
pItem->refId);
return; return;
} }
@ -622,9 +625,10 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) {
case TASK_TRIGGER_STAT_PAUSED: case TASK_TRIGGER_STAT_PAUSED:
case TASK_TRIGGER_STAT_CANCELLED: case TASK_TRIGGER_STAT_CANCELLED:
case TASK_TRIGGER_STAT_FINISHED: { case TASK_TRIGGER_STAT_FINISHED: {
taosReleaseRef(smaMgmt.smaRef, pItem->refId); tdReleaseSmaRef(smaMgmt.rsetId, pItem->refId, __func__, __LINE__);
smaDebug("vgId:%d, not fetch rsma level %" PRIi8 " data for table:%" PRIi64 " since stat is cancelled", smaDebug("vgId:%d, not fetch rsma level %" PRIi8 " data for table:%" PRIi64 " since stat is %" PRIi8
SMA_VID(pSma), pItem->level, pItem->pRsmaInfo->suid); ", rsetId rsetId:%" PRIi64 " refId:%d",
SMA_VID(pSma), pItem->level, pItem->pRsmaInfo->suid, rsmaTriggerStat, smaMgmt.rsetId, pItem->refId);
return; return;
} }
default: default:
@ -665,7 +669,7 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) {
} }
_end: _end:
taosReleaseRef(smaMgmt.smaRef, pItem->refId); tdReleaseSmaRef(smaMgmt.rsetId, pItem->refId, __func__, __LINE__);
} }
static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t inputType, SRSmaInfoItem *pItem, tb_uid_t suid, static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t inputType, SRSmaInfoItem *pItem, tb_uid_t suid,
@ -1258,7 +1262,8 @@ _end:
} }
atomic_store_8(RSMA_RUNNING_STAT(pRSmaStat), 0); atomic_store_8(RSMA_RUNNING_STAT(pRSmaStat), 0);
taosReleaseRef(smaMgmt.smaRef, pRSmaStat->refId); smaDebug("vgId:%d, release rsetId rsetId:%" PRIi64 " refId:%d", SMA_VID(pSma), smaMgmt.rsetId, pRSmaStat->refId);
tdReleaseSmaRef(smaMgmt.rsetId, pRSmaStat->refId, __func__, __LINE__);
taosThreadExit(NULL); taosThreadExit(NULL);
return NULL; return NULL;
} }
@ -1283,7 +1288,9 @@ static void tdRSmaPersistTask(SRSmaStat *pRSmaStat) {
atomic_load_8(RSMA_TRIGGER_STAT(pRSmaStat))); atomic_load_8(RSMA_TRIGGER_STAT(pRSmaStat)));
} }
atomic_store_8(RSMA_RUNNING_STAT(pRSmaStat), 0); atomic_store_8(RSMA_RUNNING_STAT(pRSmaStat), 0);
taosReleaseRef(smaMgmt.smaRef, pRSmaStat->refId); smaDebug("vgId:%d, release rsetId rsetId:%" PRIi64 " refId:%d)", SMA_VID(pRSmaStat->pSma), smaMgmt.rsetId,
pRSmaStat->refId);
tdReleaseSmaRef(smaMgmt.rsetId, pRSmaStat->refId, __func__, __LINE__);
} }
taosThreadAttrDestroy(&thAttr); taosThreadAttrDestroy(&thAttr);
@ -1297,8 +1304,8 @@ static void tdRSmaPersistTask(SRSmaStat *pRSmaStat) {
*/ */
static void tdRSmaPersistTrigger(void *param, void *tmrId) { static void tdRSmaPersistTrigger(void *param, void *tmrId) {
SRSmaStat *rsmaStat = param; SRSmaStat *rsmaStat = param;
SRSmaStat *pRSmaStat = (SRSmaStat *)taosAcquireRef(smaMgmt.smaRef, rsmaStat->refId); SRSmaStat *pRSmaStat = (SRSmaStat *)taosAcquireRef(smaMgmt.rsetId, rsmaStat->refId);
ASSERT(0);
if (!pRSmaStat) { if (!pRSmaStat) {
smaDebug("rsma persistence task not start since already destroyed"); smaDebug("rsma persistence task not start since already destroyed");
return; return;
@ -1341,5 +1348,5 @@ static void tdRSmaPersistTrigger(void *param, void *tmrId) {
smaWarn("rsma persistence not start since unknown stat %" PRIi8, tmrStat); smaWarn("rsma persistence not start since unknown stat %" PRIi8, tmrStat);
} break; } break;
} }
taosReleaseRef(smaMgmt.smaRef, rsmaStat->refId); taosReleaseRef(smaMgmt.rsetId, rsmaStat->refId);
} }

View File

@ -20,6 +20,36 @@
#define SMA_STORAGE_MINUTES_DAY 1440 #define SMA_STORAGE_MINUTES_DAY 1440
#define SMA_STORAGE_SPLIT_FACTOR 14400 // least records in tsma file #define SMA_STORAGE_SPLIT_FACTOR 14400 // least records in tsma file
// TODO: Who is responsible for resource allocate and release?
int32_t tdProcessTSmaInsert(SSma *pSma, int64_t indexUid, const char *msg) {
int32_t code = TSDB_CODE_SUCCESS;
if ((code = tdProcessTSmaInsertImpl(pSma, indexUid, msg)) < 0) {
smaWarn("vgId:%d, insert tsma data failed since %s", SMA_VID(pSma), tstrerror(terrno));
}
// TODO: destroy SSDataBlocks(msg)
return code;
}
int32_t tdProcessTSmaCreate(SSma *pSma, int64_t version, const char *msg) {
int32_t code = TSDB_CODE_SUCCESS;
if ((code = tdProcessTSmaCreateImpl(pSma, version, msg)) < 0) {
smaWarn("vgId:%d, create tsma failed since %s", SMA_VID(pSma), tstrerror(terrno));
}
// TODO: destroy SSDataBlocks(msg)
return code;
}
int32_t smaGetTSmaDays(SVnodeCfg *pCfg, void *pCont, uint32_t contLen, int32_t *days) {
int32_t code = TSDB_CODE_SUCCESS;
if ((code = tdProcessTSmaGetDaysImpl(pCfg, pCont, contLen, days)) < 0) {
smaWarn("vgId:%d, get tsma days failed since %s", pCfg->vgId, tstrerror(terrno));
}
smaDebug("vgId:%d, get tsma days %d", pCfg->vgId, *days);
return code;
}
/** /**
* @brief Judge the tsma file split days * @brief Judge the tsma file split days
* *

View File

@ -294,4 +294,23 @@ int32_t tdRemoveTFile(STFile *pTFile) {
} }
// smaXXXUtil ================ // smaXXXUtil ================
void *tdAcquireSmaRef(int32_t rsetId, int64_t refId, const char *tags, int32_t ln) {
void *pResult = taosAcquireRef(rsetId, refId);
if (!pResult) {
smaWarn("%s:%d taosAcquireRef for rsetId:%" PRIi64 " refId:%d failed since %s", tags, ln, rsetId, refId, terrstr());
} else {
smaDebug("%s:%d taosAcquireRef for rsetId:%" PRIi64 " refId:%d success", tags, ln, rsetId, refId);
}
return pResult;
}
int32_t tdReleaseSmaRef(int32_t rsetId, int64_t refId, const char *tags, int32_t ln) {
if (taosReleaseRef(rsetId, refId) < 0) {
smaWarn("%s:%d taosReleaseRef for rsetId:%" PRIi64 " refId:%d failed since %s", tags, ln, rsetId, refId, terrstr());
return TSDB_CODE_FAILED;
}
smaDebug("%s:%d taosReleaseRef for rsetId:%" PRIi64 " refId:%d success", tags, ln, rsetId, refId);
return TSDB_CODE_SUCCESS;
}
// ... // ...

View File

@ -244,11 +244,6 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
STqOffsetVal fetchOffsetNew; STqOffsetVal fetchOffsetNew;
// 1.find handle // 1.find handle
char buf[80];
tFormatOffset(buf, 80, &reqOffset);
tqDebug("tmq poll: consumer:%" PRId64 ", (epoch %d) recv poll req in vgId:%d, req offset %s", consumerId, pReq->epoch,
TD_VID(pTq->pVnode), buf);
STqHandle* pHandle = taosHashGet(pTq->handles, pReq->subKey, strlen(pReq->subKey)); STqHandle* pHandle = taosHashGet(pTq->handles, pReq->subKey, strlen(pReq->subKey));
/*ASSERT(pHandle);*/ /*ASSERT(pHandle);*/
if (pHandle == NULL) { if (pHandle == NULL) {
@ -271,6 +266,11 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
consumerEpoch = atomic_val_compare_exchange_32(&pHandle->epoch, consumerEpoch, reqEpoch); consumerEpoch = atomic_val_compare_exchange_32(&pHandle->epoch, consumerEpoch, reqEpoch);
} }
char buf[80];
tFormatOffset(buf, 80, &reqOffset);
tqDebug("tmq poll: consumer %ld (epoch %d), subkey %s, recv poll req in vg %d, req offset %s", consumerId,
pReq->epoch, pHandle->subKey, TD_VID(pTq->pVnode), buf);
// 2.reset offset if needed // 2.reset offset if needed
if (reqOffset.type > 0) { if (reqOffset.type > 0) {
fetchOffsetNew = reqOffset; fetchOffsetNew = reqOffset;
@ -280,7 +280,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
fetchOffsetNew = pOffset->val; fetchOffsetNew = pOffset->val;
char formatBuf[80]; char formatBuf[80];
tFormatOffset(formatBuf, 80, &fetchOffsetNew); tFormatOffset(formatBuf, 80, &fetchOffsetNew);
tqDebug("tmq poll: consumer:%" PRId64 ", offset reset to %s", consumerId, formatBuf); tqDebug("tmq poll: consumer %" PRId64 ", subkey %s, offset reset to %s", consumerId, pHandle->subKey, formatBuf);
} else { } else {
if (reqOffset.type == TMQ_OFFSET__RESET_EARLIEAST) { if (reqOffset.type == TMQ_OFFSET__RESET_EARLIEAST) {
if (pReq->useSnapshot && pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { if (pReq->useSnapshot && pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
@ -295,9 +295,30 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
} }
} else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) { } else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) {
tqOffsetResetToLog(&fetchOffsetNew, walGetLastVer(pTq->pVnode->pWal)); tqOffsetResetToLog(&fetchOffsetNew, walGetLastVer(pTq->pVnode->pWal));
tqDebug("tmq poll: consumer %ld, subkey %s, offset reset to %ld", consumerId, pHandle->subKey,
fetchOffsetNew.version);
SMqDataRsp dataRsp = {0};
tqInitDataRsp(&dataRsp, pReq, pHandle->execHandle.subType);
dataRsp.rspOffset = fetchOffsetNew;
code = 0;
if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) {
code = -1;
}
taosArrayDestroy(dataRsp.blockDataLen);
taosArrayDestroyP(dataRsp.blockData, (FDelete)taosMemoryFree);
if (dataRsp.withSchema) {
taosArrayDestroyP(dataRsp.blockSchema, (FDelete)tDeleteSSchemaWrapper);
}
if (dataRsp.withTbName) {
taosArrayDestroyP(dataRsp.blockTbName, (FDelete)taosMemoryFree);
}
return code;
} else if (reqOffset.type == TMQ_OFFSET__RESET_NONE) { } else if (reqOffset.type == TMQ_OFFSET__RESET_NONE) {
tqError("tmq poll: no offset committed for consumer:%" PRId64 ", in vgId:%d, subkey %s, reset none failed", tqError("tmq poll: subkey %s, no offset committed for consumer %" PRId64
consumerId, TD_VID(pTq->pVnode), pReq->subKey); " in vg %d, subkey %s, reset none failed",
pHandle->subKey, consumerId, TD_VID(pTq->pVnode), pReq->subKey);
terrno = TSDB_CODE_TQ_NO_COMMITTED_OFFSET; terrno = TSDB_CODE_TQ_NO_COMMITTED_OFFSET;
return -1; return -1;
} }
@ -308,7 +329,24 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
SMqDataRsp dataRsp = {0}; SMqDataRsp dataRsp = {0};
tqInitDataRsp(&dataRsp, pReq, pHandle->execHandle.subType); tqInitDataRsp(&dataRsp, pReq, pHandle->execHandle.subType);
if (fetchOffsetNew.type == TMQ_OFFSET__LOG) { if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN && fetchOffsetNew.type == TMQ_OFFSET__LOG) {
fetchOffsetNew.version++;
if (tqScanLog(pTq, &pHandle->execHandle, &dataRsp, &fetchOffsetNew) < 0) {
ASSERT(0);
code = -1;
goto OVER;
}
if (dataRsp.blockNum == 0) {
// TODO add to async task
/*dataRsp.rspOffset.version--;*/
}
if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) {
code = -1;
}
goto OVER;
}
if (pHandle->execHandle.subType != TOPIC_SUB_TYPE__COLUMN && fetchOffsetNew.type == TMQ_OFFSET__LOG) {
int64_t fetchVer = fetchOffsetNew.version + 1; int64_t fetchVer = fetchOffsetNew.version + 1;
SWalCkHead* pCkHead = taosMemoryMalloc(sizeof(SWalCkHead) + 2048); SWalCkHead* pCkHead = taosMemoryMalloc(sizeof(SWalCkHead) + 2048);
if (pCkHead == NULL) { if (pCkHead == NULL) {
@ -320,9 +358,9 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
while (1) { while (1) {
consumerEpoch = atomic_load_32(&pHandle->epoch); consumerEpoch = atomic_load_32(&pHandle->epoch);
if (consumerEpoch > reqEpoch) { if (consumerEpoch > reqEpoch) {
tqWarn("tmq poll: consumer:%" PRId64 ", (epoch %d) vgId:%d offset %" PRId64 tqWarn("tmq poll: consumer %ld (epoch %d), subkey %s, vg %d offset %" PRId64
", found new consumer epoch %d, discard req epoch %d", ", found new consumer epoch %d, discard req epoch %d",
consumerId, pReq->epoch, TD_VID(pTq->pVnode), fetchVer, consumerEpoch, reqEpoch); consumerId, pReq->epoch, pHandle->subKey, TD_VID(pTq->pVnode), fetchVer, consumerEpoch, reqEpoch);
break; break;
} }

View File

@ -46,7 +46,7 @@ static int32_t tqAddBlockSchemaToRsp(const STqExecHandle* pExec, int32_t workerI
return 0; return 0;
} }
static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, SMqDataRsp* pRsp, int32_t workerId) { static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, SMqDataRsp* pRsp) {
SMetaReader mr = {0}; SMetaReader mr = {0};
metaReaderInit(&mr, pTq->pVnode->pMeta, 0); metaReaderInit(&mr, pTq->pVnode->pMeta, 0);
if (metaGetTableEntryByUid(&mr, uid) < 0) { if (metaGetTableEntryByUid(&mr, uid) < 0) {
@ -59,6 +59,53 @@ static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, SMqDataRsp* pRsp, i
return 0; return 0;
} }
int64_t tqScanLog(STQ* pTq, const STqExecHandle* pExec, SMqDataRsp* pRsp, STqOffsetVal* pOffset) {
qTaskInfo_t task = pExec->execCol.task[0];
if (qStreamPrepareScan1(task, pOffset) < 0) {
pRsp->rspOffset = *pOffset;
pRsp->rspOffset.version--;
return 0;
}
while (1) {
SSDataBlock* pDataBlock = NULL;
uint64_t ts = 0;
if (qExecTask(task, &pDataBlock, &ts) < 0) {
ASSERT(0);
}
if (pDataBlock != NULL) {
tqAddBlockDataToRsp(pDataBlock, pRsp);
if (pRsp->withTbName) {
int64_t uid = pExec->pExecReader[0]->msgIter.uid;
tqAddTbNameToRsp(pTq, uid, pRsp);
}
pRsp->blockNum++;
continue;
}
void* meta = qStreamExtractMetaMsg(task);
if (meta != NULL) {
// tq add meta to rsp
}
if (qStreamExtractOffset(task, &pRsp->rspOffset) < 0) {
ASSERT(0);
}
if (pRsp->rspOffset.type == TMQ_OFFSET__LOG) {
ASSERT(pRsp->rspOffset.version + 1 >= pRsp->reqOffset.version);
}
ASSERT(pRsp->rspOffset.type != 0);
break;
}
return 0;
}
int32_t tqScanSnapshot(STQ* pTq, const STqExecHandle* pExec, SMqDataRsp* pRsp, STqOffsetVal offset, int32_t workerId) { int32_t tqScanSnapshot(STQ* pTq, const STqExecHandle* pExec, SMqDataRsp* pRsp, STqOffsetVal offset, int32_t workerId) {
ASSERT(pExec->subType == TOPIC_SUB_TYPE__COLUMN); ASSERT(pExec->subType == TOPIC_SUB_TYPE__COLUMN);
qTaskInfo_t task = pExec->execCol.task[workerId]; qTaskInfo_t task = pExec->execCol.task[workerId];
@ -67,7 +114,7 @@ int32_t tqScanSnapshot(STQ* pTq, const STqExecHandle* pExec, SMqDataRsp* pRsp, S
/*ASSERT(0);*/ /*ASSERT(0);*/
/*}*/ /*}*/
if (qStreamPrepareScan(task, offset.uid, offset.ts) < 0) { if (qStreamPrepareTsdbScan(task, offset.uid, offset.ts) < 0) {
ASSERT(0); ASSERT(0);
} }
@ -93,7 +140,7 @@ int32_t tqScanSnapshot(STQ* pTq, const STqExecHandle* pExec, SMqDataRsp* pRsp, S
if (qGetStreamScanStatus(task, &uid, &ts) < 0) { if (qGetStreamScanStatus(task, &uid, &ts) < 0) {
ASSERT(0); ASSERT(0);
} }
tqAddTbNameToRsp(pTq, uid, pRsp, workerId); tqAddTbNameToRsp(pTq, uid, pRsp);
#endif #endif
} }
pRsp->blockNum++; pRsp->blockNum++;
@ -129,7 +176,7 @@ int32_t tqLogScanExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataR
tqAddBlockDataToRsp(pDataBlock, pRsp); tqAddBlockDataToRsp(pDataBlock, pRsp);
if (pRsp->withTbName) { if (pRsp->withTbName) {
int64_t uid = pExec->pExecReader[workerId]->msgIter.uid; int64_t uid = pExec->pExecReader[workerId]->msgIter.uid;
tqAddTbNameToRsp(pTq, uid, pRsp, workerId); tqAddTbNameToRsp(pTq, uid, pRsp);
} }
pRsp->blockNum++; pRsp->blockNum++;
} }
@ -146,7 +193,7 @@ int32_t tqLogScanExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataR
tqAddBlockDataToRsp(&block, pRsp); tqAddBlockDataToRsp(&block, pRsp);
if (pRsp->withTbName) { if (pRsp->withTbName) {
int64_t uid = pExec->pExecReader[workerId]->msgIter.uid; int64_t uid = pExec->pExecReader[workerId]->msgIter.uid;
tqAddTbNameToRsp(pTq, uid, pRsp, workerId); tqAddTbNameToRsp(pTq, uid, pRsp);
} }
tqAddBlockSchemaToRsp(pExec, workerId, pRsp); tqAddBlockSchemaToRsp(pExec, workerId, pRsp);
pRsp->blockNum++; pRsp->blockNum++;
@ -164,7 +211,7 @@ int32_t tqLogScanExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataR
tqAddBlockDataToRsp(&block, pRsp); tqAddBlockDataToRsp(&block, pRsp);
if (pRsp->withTbName) { if (pRsp->withTbName) {
int64_t uid = pExec->pExecReader[workerId]->msgIter.uid; int64_t uid = pExec->pExecReader[workerId]->msgIter.uid;
tqAddTbNameToRsp(pTq, uid, pRsp, workerId); tqAddTbNameToRsp(pTq, uid, pRsp);
} }
tqAddBlockSchemaToRsp(pExec, workerId, pRsp); tqAddBlockSchemaToRsp(pExec, workerId, pRsp);
pRsp->blockNum++; pRsp->blockNum++;

View File

@ -15,11 +15,6 @@
#include "tq.h" #include "tq.h"
int64_t tqScanLog(STQ* pTq, const STqExecHandle* pExec, SMqDataRsp* pRsp, STqOffsetVal offset) {
/*if ()*/
return 0;
}
int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHead** ppCkHead) { int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHead** ppCkHead) {
int32_t code = 0; int32_t code = 0;
taosThreadMutexLock(&pHandle->pWalReader->mutex); taosThreadMutexLock(&pHandle->pWalReader->mutex);
@ -84,8 +79,10 @@ STqReader* tqOpenReader(SVnode* pVnode) {
return NULL; return NULL;
} }
// TODO open pReader->pWalReader = walOpenReader(pVnode->pWal, NULL);
/*pReader->pWalReader = walOpenReader(pVnode->pWal, NULL);*/ if (pReader->pWalReader == NULL) {
return NULL;
}
pReader->pVnodeMeta = pVnode->pMeta; pReader->pVnodeMeta = pVnode->pMeta;
pReader->pMsg = NULL; pReader->pMsg = NULL;
@ -106,12 +103,19 @@ void tqCloseReader(STqReader* pReader) {
taosMemoryFree(pReader); taosMemoryFree(pReader);
} }
int32_t tqSeekVer(STqReader* pReader, int64_t ver) {
//
return walReadSeekVer(pReader->pWalReader, ver);
}
int32_t tqNextBlock(STqReader* pReader, SFetchRet* ret) { int32_t tqNextBlock(STqReader* pReader, SFetchRet* ret) {
bool fromProcessedMsg = pReader->pMsg != NULL; bool fromProcessedMsg = pReader->pMsg != NULL;
while (1) { while (1) {
if (!fromProcessedMsg) { if (!fromProcessedMsg) {
if (walNextValidMsg(pReader->pWalReader) < 0) { if (walNextValidMsg(pReader->pWalReader) < 0) {
ret->offset.type = TMQ_OFFSET__LOG;
ret->offset.version = pReader->ver;
ret->fetchType = FETCH_TYPE__NONE; ret->fetchType = FETCH_TYPE__NONE;
return -1; return -1;
} }
@ -130,19 +134,25 @@ int32_t tqNextBlock(STqReader* pReader, SFetchRet* ret) {
memset(&ret->data, 0, sizeof(SSDataBlock)); memset(&ret->data, 0, sizeof(SSDataBlock));
int32_t code = tqRetrieveDataBlock(&ret->data, pReader); int32_t code = tqRetrieveDataBlock(&ret->data, pReader);
if (code != 0 || ret->data.info.rows == 0) { if (code != 0 || ret->data.info.rows == 0) {
ASSERT(0);
continue;
#if 0
if (fromProcessedMsg) { if (fromProcessedMsg) {
ret->fetchType = FETCH_TYPE__NONE; ret->fetchType = FETCH_TYPE__NONE;
return 0; return 0;
} else { } else {
break; break;
} }
#endif
} }
ret->fetchType = FETCH_TYPE__DATA; ret->fetchType = FETCH_TYPE__DATA;
return 0; return 0;
} }
if (fromProcessedMsg) { if (fromProcessedMsg) {
ret->offset.type = TMQ_OFFSET__LOG;
ret->offset.version = pReader->ver;
ASSERT(pReader->ver != -1);
ret->fetchType = FETCH_TYPE__NONE; ret->fetchType = FETCH_TYPE__NONE;
return 0; return 0;
} }

View File

@ -1256,6 +1256,8 @@ int32_t tsdbCacheDelete(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey) {
if (invalidate) { if (invalidate) {
taosLRUCacheRelease(pCache, h, true); taosLRUCacheRelease(pCache, h, true);
} else {
taosLRUCacheRelease(pCache, h, false);
} }
// void taosLRUCacheErase(SLRUCache * cache, const void *key, size_t keyLen); // void taosLRUCacheErase(SLRUCache * cache, const void *key, size_t keyLen);
} }

View File

@ -2182,12 +2182,21 @@ static STsdb* getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* ret
return VND_TSDB(pVnode); return VND_TSDB(pVnode);
} }
static SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond, int8_t level) { SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond, int8_t level) {
int64_t startVer = (pCond->startVersion == -1)? 0:pCond->startVersion;
if (VND_IS_RSMA(pVnode)) { if (VND_IS_RSMA(pVnode)) {
return (SVersionRange){.minVer = pCond->startVersion, .maxVer = tdRSmaGetMaxSubmitVer(pVnode->pSma, level)}; return (SVersionRange){.minVer = startVer, .maxVer = tdRSmaGetMaxSubmitVer(pVnode->pSma, level)};
} }
return (SVersionRange){.minVer = pCond->startVersion, .maxVer = pVnode->state.applied}; int64_t endVer = 0;
if (pCond->endVersion == -1) { // user not specified end version, set current maximum version of vnode as the endVersion
endVer = pVnode->state.applied;
} else {
endVer = (pCond->endVersion > pVnode->state.applied)? pVnode->state.applied:pCond->endVersion;
}
return (SVersionRange){.minVer = startVer, .maxVer = endVer};
} }
// // todo not unref yet, since it is not support multi-group interpolation query // // todo not unref yet, since it is not support multi-group interpolation query

View File

@ -100,6 +100,7 @@ void vnodeCleanup() {
walCleanUp(); walCleanUp();
tqCleanUp(); tqCleanUp();
smaCleanUp();
} }
int vnodeScheduleTask(int (*execute)(void*), void* arg) { int vnodeScheduleTask(int (*execute)(void*), void* arg) {

View File

@ -57,7 +57,6 @@ typedef int32_t (*__block_search_fn_t)(char* data, int32_t num, int64_t key, int
#define GROUPID_COLUMN_INDEX UID_COLUMN_INDEX #define GROUPID_COLUMN_INDEX UID_COLUMN_INDEX
#define DELETE_GROUPID_COLUMN_INDEX 2 #define DELETE_GROUPID_COLUMN_INDEX 2
enum { enum {
// when this task starts to execute, this status will set // when this task starts to execute, this status will set
TASK_NOT_COMPLETED = 0x1u, TASK_NOT_COMPLETED = 0x1u,
@ -139,6 +138,14 @@ typedef struct STaskIdInfo {
char* str; char* str;
} STaskIdInfo; } STaskIdInfo;
typedef struct {
STqOffsetVal prepareStatus; // for tmq
STqOffsetVal lastStatus; // for tmq
void* metaBlk; // for tmq fetching meta
SSDataBlock* pullOverBlk; // for streaming
SWalFilterCond cond;
} SStreamTaskInfo;
typedef struct SExecTaskInfo { typedef struct SExecTaskInfo {
STaskIdInfo id; STaskIdInfo id;
uint32_t status; uint32_t status;
@ -146,11 +153,14 @@ typedef struct SExecTaskInfo {
STaskCostInfo cost; STaskCostInfo cost;
int64_t owner; // if it is in execution int64_t owner; // if it is in execution
int32_t code; int32_t code;
SStreamTaskInfo streamInfo;
struct { struct {
char *tablename; char* tablename;
char *dbname; char* dbname;
int32_t tversion; int32_t tversion;
SSchemaWrapper*sw; SSchemaWrapper* sw;
} schemaVer; } schemaVer;
STableListInfo tableqinfoList; // this is a table list STableListInfo tableqinfoList; // this is a table list
@ -385,7 +395,7 @@ typedef struct SStreamScanInfo {
int32_t deleteDataIndex; int32_t deleteDataIndex;
// status for tmq // status for tmq
//SSchemaWrapper schema; // SSchemaWrapper schema;
STqOffset offset; STqOffset offset;
} SStreamScanInfo; } SStreamScanInfo;
@ -595,7 +605,7 @@ typedef struct SSessionAggOperatorInfo {
int64_t gap; // session window gap int64_t gap; // session window gap
int32_t tsSlotId; // primary timestamp slot id int32_t tsSlotId; // primary timestamp slot id
STimeWindowAggSupp twAggSup; STimeWindowAggSupp twAggSup;
SNode *pCondition; const SNode* pCondition;
} SSessionAggOperatorInfo; } SSessionAggOperatorInfo;
typedef struct SResultWindowInfo { typedef struct SResultWindowInfo {
@ -657,7 +667,7 @@ typedef struct SStateWindowOperatorInfo {
int32_t tsSlotId; // primary timestamp column slot id int32_t tsSlotId; // primary timestamp column slot id
STimeWindowAggSupp twAggSup; STimeWindowAggSupp twAggSup;
// bool reptScan; // bool reptScan;
const SNode *pCondition; const SNode* pCondition;
} SStateWindowOperatorInfo; } SStateWindowOperatorInfo;
typedef struct SStreamStateAggOperatorInfo { typedef struct SStreamStateAggOperatorInfo {

View File

@ -748,6 +748,7 @@ SInterval extractIntervalInfo(const STableScanPhysiNode* pTableScanNode) {
SColumn extractColumnFromColumnNode(SColumnNode* pColNode) { SColumn extractColumnFromColumnNode(SColumnNode* pColNode) {
SColumn c = {0}; SColumn c = {0};
c.slotId = pColNode->slotId; c.slotId = pColNode->slotId;
c.colId = pColNode->colId; c.colId = pColNode->colId;
c.type = pColNode->node.resType.type; c.type = pColNode->node.resType.type;
@ -774,6 +775,8 @@ int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysi
pCond->suid = pTableScanNode->scan.suid; pCond->suid = pTableScanNode->scan.suid;
pCond->type = BLOCK_LOAD_OFFSET_ORDER; pCond->type = BLOCK_LOAD_OFFSET_ORDER;
pCond->startVersion = -1;
pCond->endVersion = -1;
// pCond->type = pTableScanNode->scanFlag; // pCond->type = pTableScanNode->scanFlag;
int32_t j = 0; int32_t j = 0;

View File

@ -60,9 +60,9 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
taosArrayAddAll(p->pDataBlock, pDataBlock->pDataBlock); taosArrayAddAll(p->pDataBlock, pDataBlock->pDataBlock);
taosArrayPush(pInfo->pBlockLists, &p); taosArrayPush(pInfo->pBlockLists, &p);
} }
} else if (type == STREAM_INPUT__DATA_SCAN) { } else if (type == STREAM_INPUT__TABLE_SCAN) {
// do nothing // do nothing
ASSERT(pInfo->blockType == STREAM_INPUT__DATA_SCAN); ASSERT(pInfo->blockType == STREAM_INPUT__TABLE_SCAN);
} else { } else {
ASSERT(0); ASSERT(0);
} }
@ -76,7 +76,7 @@ int32_t qStreamScanSnapshot(qTaskInfo_t tinfo) {
return TSDB_CODE_QRY_APP_ERROR; return TSDB_CODE_QRY_APP_ERROR;
} }
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
return doSetStreamBlock(pTaskInfo->pRoot, NULL, 0, STREAM_INPUT__DATA_SCAN, 0, NULL); return doSetStreamBlock(pTaskInfo->pRoot, NULL, 0, STREAM_INPUT__TABLE_SCAN, 0, NULL);
} }
int32_t qSetStreamInput(qTaskInfo_t tinfo, const void* input, int32_t type, bool assignUid) { int32_t qSetStreamInput(qTaskInfo_t tinfo, const void* input, int32_t type, bool assignUid) {

View File

@ -267,7 +267,46 @@ const STqOffset* qExtractStatusFromStreamScanner(void* scanner) {
return &pInfo->offset; return &pInfo->offset;
} }
int32_t qStreamPrepareScan(qTaskInfo_t tinfo, uint64_t uid, int64_t ts) { void* qStreamExtractMetaMsg(qTaskInfo_t tinfo) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM);
return pTaskInfo->streamInfo.metaBlk;
}
int32_t qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM);
memcpy(pOffset, &pTaskInfo->streamInfo.lastStatus, sizeof(STqOffsetVal));
return 0;
}
int32_t qStreamPrepareScan1(qTaskInfo_t tinfo, const STqOffsetVal* pOffset) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
SOperatorInfo* pOperator = pTaskInfo->pRoot;
ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM);
pTaskInfo->streamInfo.prepareStatus = *pOffset;
// TODO: optimize
/*if (pTaskInfo->streamInfo.lastStatus.type != pOffset->type ||*/
/*pTaskInfo->streamInfo.prepareStatus.version != pTaskInfo->streamInfo.lastStatus.version) {*/
while (1) {
uint8_t type = pOperator->operatorType;
pOperator->status = OP_OPENED;
if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
SStreamScanInfo* pInfo = pOperator->info;
if (tqSeekVer(pInfo->tqReader, pOffset->version) < 0) {
return -1;
}
return 0;
} else {
ASSERT(pOperator->numOfDownstream == 1);
pOperator = pOperator->pDownstream[0];
}
}
/*}*/
return 0;
}
int32_t qStreamPrepareTsdbScan(qTaskInfo_t tinfo, uint64_t uid, int64_t ts) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
if (uid == 0) { if (uid == 0) {

View File

@ -2852,7 +2852,7 @@ int32_t doPrepareScan(SOperatorInfo* pOperator, uint64_t uid, int64_t ts) {
if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
SStreamScanInfo* pScanInfo = pOperator->info; SStreamScanInfo* pScanInfo = pOperator->info;
pScanInfo->blockType = STREAM_INPUT__DATA_SCAN; pScanInfo->blockType = STREAM_INPUT__TABLE_SCAN;
pScanInfo->pTableScanOp->status = OP_OPENED; pScanInfo->pTableScanOp->status = OP_OPENED;
@ -3287,7 +3287,10 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
// The downstream exec may change the value of the newgroup, so use a local variable instead. // The downstream exec may change the value of the newgroup, so use a local variable instead.
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
if (pBlock == NULL) { if (pBlock == NULL) {
// TODO optimize
/*if (pTaskInfo->execModel != OPTR_EXEC_MODEL_STREAM) {*/
doSetOperatorCompleted(pOperator); doSetOperatorCompleted(pOperator);
/*}*/
break; break;
} }
if (pBlock->info.type == STREAM_RETRIEVE) { if (pBlock->info.type == STREAM_RETRIEVE) {

View File

@ -39,8 +39,8 @@ static int32_t buildSysDbTableInfo(const SSysTableScanInfo* pInfo, int32_t capac
static int32_t buildDbTableInfoBlock(const SSDataBlock* p, const SSysTableMeta* pSysDbTableMeta, size_t size, static int32_t buildDbTableInfoBlock(const SSDataBlock* p, const SSysTableMeta* pSysDbTableMeta, size_t size,
const char* dbName); const char* dbName);
static void addTagPseudoColumnData(SReadHandle* pHandle, SExprInfo* pPseudoExpr, int32_t numOfPseudoExpr, static int32_t addTagPseudoColumnData(SReadHandle* pHandle, SExprInfo* pPseudoExpr, int32_t numOfPseudoExpr,
SSDataBlock* pBlock); SSDataBlock* pBlock, const char* idStr);
static bool processBlockWithProbability(const SSampleExecInfo* pInfo); static bool processBlockWithProbability(const SSampleExecInfo* pInfo);
bool processBlockWithProbability(const SSampleExecInfo* pInfo) { bool processBlockWithProbability(const SSampleExecInfo* pInfo) {
@ -210,6 +210,7 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca
bool allColumnsHaveAgg = true; bool allColumnsHaveAgg = true;
SColumnDataAgg** pColAgg = NULL; SColumnDataAgg** pColAgg = NULL;
int32_t code = tsdbRetrieveDatablockSMA(pTableScanInfo->dataReader, &pColAgg, &allColumnsHaveAgg); int32_t code = tsdbRetrieveDatablockSMA(pTableScanInfo->dataReader, &pColAgg, &allColumnsHaveAgg);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, code); longjmp(pTaskInfo->env, code);
@ -263,7 +264,11 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca
// currently only the tbname pseudo column // currently only the tbname pseudo column
if (pTableScanInfo->pseudoSup.numOfExprs > 0) { if (pTableScanInfo->pseudoSup.numOfExprs > 0) {
SExprSupp* pSup = &pTableScanInfo->pseudoSup; SExprSupp* pSup = &pTableScanInfo->pseudoSup;
addTagPseudoColumnData(&pTableScanInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pBlock);
int32_t code = addTagPseudoColumnData(&pTableScanInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pBlock, GET_TASKID(pTaskInfo));
if (code != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, code);
}
} }
int64_t st = taosGetTimestampMs(); int64_t st = taosGetTimestampMs();
@ -297,16 +302,21 @@ static void prepareForDescendingScan(STableScanInfo* pTableScanInfo, SqlFunction
taosqsort(pCond->twindows, pCond->numOfTWindows, sizeof(STimeWindow), pCond, compareTimeWindow); taosqsort(pCond->twindows, pCond->numOfTWindows, sizeof(STimeWindow), pCond, compareTimeWindow);
} }
void addTagPseudoColumnData(SReadHandle* pHandle, SExprInfo* pPseudoExpr, int32_t numOfPseudoExpr, int32_t addTagPseudoColumnData(SReadHandle* pHandle, SExprInfo* pPseudoExpr, int32_t numOfPseudoExpr,
SSDataBlock* pBlock) { SSDataBlock* pBlock, const char* idStr) {
// currently only the tbname pseudo column // currently only the tbname pseudo column
if (numOfPseudoExpr == 0) { if (numOfPseudoExpr == 0) {
return; return TSDB_CODE_SUCCESS;
} }
SMetaReader mr = {0}; SMetaReader mr = {0};
metaReaderInit(&mr, pHandle->meta, 0); metaReaderInit(&mr, pHandle->meta, 0);
metaGetTableEntryByUid(&mr, pBlock->info.uid); int32_t code = metaGetTableEntryByUid(&mr, pBlock->info.uid);
if (code != TSDB_CODE_SUCCESS) {
qError("failed to get table meta, uid:0x%"PRIx64 ", code:%s, %s", pBlock->info.uid, tstrerror(terrno), idStr);
metaReaderClear(&mr);
return terrno;
}
for (int32_t j = 0; j < numOfPseudoExpr; ++j) { for (int32_t j = 0; j < numOfPseudoExpr; ++j) {
SExprInfo* pExpr = &pPseudoExpr[j]; SExprInfo* pExpr = &pPseudoExpr[j];
@ -348,6 +358,7 @@ void addTagPseudoColumnData(SReadHandle* pHandle, SExprInfo* pPseudoExpr, int32_
} }
metaReaderClear(&mr); metaReaderClear(&mr);
return TSDB_CODE_SUCCESS;
} }
void setTbNameColData(void* pMeta, const SSDataBlock* pBlock, SColumnInfoData* pColInfoData, int32_t functionId) { void setTbNameColData(void* pMeta, const SSDataBlock* pBlock, SColumnInfoData* pColInfoData, int32_t functionId) {
@ -679,34 +690,46 @@ SOperatorInfo* createTableSeqScanOperatorInfo(void* pReadHandle, SExecTaskInfo*
return pOperator; return pOperator;
} }
static int32_t doGetTableRowSize(void* pMeta, uint64_t uid) { static int32_t doGetTableRowSize(void* pMeta, uint64_t uid, int32_t* rowLen, const char* idstr) {
int32_t rowLen = 0; *rowLen = 0;
SMetaReader mr = {0}; SMetaReader mr = {0};
metaReaderInit(&mr, pMeta, 0); metaReaderInit(&mr, pMeta, 0);
metaGetTableEntryByUid(&mr, uid); int32_t code = metaGetTableEntryByUid(&mr, uid);
if (code != TSDB_CODE_SUCCESS) {
qError("failed to get table meta, uid:0x%"PRIx64 ", code:%s, %s", uid, tstrerror(terrno), idstr);
metaReaderClear(&mr);
return terrno;
}
if (mr.me.type == TSDB_SUPER_TABLE) { if (mr.me.type == TSDB_SUPER_TABLE) {
int32_t numOfCols = mr.me.stbEntry.schemaRow.nCols; int32_t numOfCols = mr.me.stbEntry.schemaRow.nCols;
for (int32_t i = 0; i < numOfCols; ++i) { for (int32_t i = 0; i < numOfCols; ++i) {
rowLen += mr.me.stbEntry.schemaRow.pSchema[i].bytes; (*rowLen) += mr.me.stbEntry.schemaRow.pSchema[i].bytes;
} }
} else if (mr.me.type == TSDB_CHILD_TABLE) { } else if (mr.me.type == TSDB_CHILD_TABLE) {
uint64_t suid = mr.me.ctbEntry.suid; uint64_t suid = mr.me.ctbEntry.suid;
metaGetTableEntryByUid(&mr, suid); code = metaGetTableEntryByUid(&mr, suid);
if (code != TSDB_CODE_SUCCESS) {
qError("failed to get table meta, uid:0x%"PRIx64 ", code:%s, %s", suid, tstrerror(terrno), idstr);
metaReaderClear(&mr);
return terrno;
}
int32_t numOfCols = mr.me.stbEntry.schemaRow.nCols; int32_t numOfCols = mr.me.stbEntry.schemaRow.nCols;
for (int32_t i = 0; i < numOfCols; ++i) { for (int32_t i = 0; i < numOfCols; ++i) {
rowLen += mr.me.stbEntry.schemaRow.pSchema[i].bytes; (*rowLen) += mr.me.stbEntry.schemaRow.pSchema[i].bytes;
} }
} else if (mr.me.type == TSDB_NORMAL_TABLE) { } else if (mr.me.type == TSDB_NORMAL_TABLE) {
int32_t numOfCols = mr.me.ntbEntry.schemaRow.nCols; int32_t numOfCols = mr.me.ntbEntry.schemaRow.nCols;
for (int32_t i = 0; i < numOfCols; ++i) { for (int32_t i = 0; i < numOfCols; ++i) {
rowLen += mr.me.ntbEntry.schemaRow.pSchema[i].bytes; (*rowLen) += mr.me.ntbEntry.schemaRow.pSchema[i].bytes;
} }
} }
metaReaderClear(&mr); metaReaderClear(&mr);
return rowLen; return TSDB_CODE_SUCCESS;
} }
static SSDataBlock* doBlockInfoScan(SOperatorInfo* pOperator) { static SSDataBlock* doBlockInfoScan(SOperatorInfo* pOperator) {
@ -715,9 +738,13 @@ static SSDataBlock* doBlockInfoScan(SOperatorInfo* pOperator) {
} }
SBlockDistInfo* pBlockScanInfo = pOperator->info; SBlockDistInfo* pBlockScanInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
STableBlockDistInfo blockDistInfo = {.minRows = INT_MAX, .maxRows = INT_MIN}; STableBlockDistInfo blockDistInfo = {.minRows = INT_MAX, .maxRows = INT_MIN};
blockDistInfo.rowSize = doGetTableRowSize(pBlockScanInfo->readHandle.meta, pBlockScanInfo->uid); int32_t code = doGetTableRowSize(pBlockScanInfo->readHandle.meta, pBlockScanInfo->uid, &blockDistInfo.rowSize, GET_TASKID(pTaskInfo));
if (code != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, code);
}
tsdbGetFileBlocksDistInfo(pBlockScanInfo->pHandle, &blockDistInfo); tsdbGetFileBlocksDistInfo(pBlockScanInfo->pHandle, &blockDistInfo);
blockDistInfo.numOfInmemRows = (int32_t)tsdbGetNumOfRowsInMemTable(pBlockScanInfo->pHandle); blockDistInfo.numOfInmemRows = (int32_t)tsdbGetNumOfRowsInMemTable(pBlockScanInfo->pHandle);
@ -1130,15 +1157,124 @@ static void setBlockGroupId(SOperatorInfo* pOperator, SSDataBlock* pBlock, int32
uidCol[i] = getGroupId(pOperator, uidCol[i]); uidCol[i] = getGroupId(pOperator, uidCol[i]);
} }
} }
static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock) {
SDataBlockInfo* pBlockInfo = &pInfo->pRes->info;
SOperatorInfo* pOperator = pInfo->pStreamScanOp;
SExecTaskInfo* pTaskInfo = pInfo->pStreamScanOp->pTaskInfo;
pInfo->pRes->info.rows = pBlock->info.rows;
pInfo->pRes->info.uid = pBlock->info.uid;
pInfo->pRes->info.type = STREAM_NORMAL;
pInfo->pRes->info.capacity = pBlock->info.rows;
// for generating rollup SMA result, each time is an independent time serie.
// TODO temporarily used, when the statement of "partition by tbname" is ready, remove this
if (pInfo->assignBlockUid) {
pInfo->pRes->info.groupId = pBlock->info.uid;
}
uint64_t* groupIdPre = taosHashGet(pOperator->pTaskInfo->tableqinfoList.map, &pBlock->info.uid, sizeof(int64_t));
if (groupIdPre) {
pInfo->pRes->info.groupId = *groupIdPre;
} else {
pInfo->pRes->info.groupId = 0;
}
// todo extract method
for (int32_t i = 0; i < taosArrayGetSize(pInfo->pColMatchInfo); ++i) {
SColMatchInfo* pColMatchInfo = taosArrayGet(pInfo->pColMatchInfo, i);
if (!pColMatchInfo->output) {
continue;
}
bool colExists = false;
for (int32_t j = 0; j < blockDataGetNumOfCols(pBlock); ++j) {
SColumnInfoData* pResCol = bdGetColumnInfoData(pBlock, j);
if (pResCol->info.colId == pColMatchInfo->colId) {
taosArraySet(pInfo->pRes->pDataBlock, pColMatchInfo->targetSlotId, pResCol);
colExists = true;
break;
}
}
// the required column does not exists in submit block, let's set it to be all null value
if (!colExists) {
SColumnInfoData* pDst = taosArrayGet(pInfo->pRes->pDataBlock, pColMatchInfo->targetSlotId);
colDataAppendNNULL(pDst, 0, pBlockInfo->rows);
}
}
taosArrayDestroy(pBlock->pDataBlock);
ASSERT(pInfo->pRes->pDataBlock != NULL);
#if 0
if (pInfo->pRes->pDataBlock == NULL) {
// TODO add log
updateInfoDestoryColseWinSBF(pInfo->pUpdateInfo);
pOperator->status = OP_EXEC_DONE;
pTaskInfo->code = terrno;
return -1;
}
#endif
// currently only the tbname pseudo column
if (pInfo->numOfPseudoExpr > 0) {
int32_t code = addTagPseudoColumnData(&pInfo->readHandle, pInfo->pPseudoExpr, pInfo->numOfPseudoExpr, pInfo->pRes, GET_TASKID(pTaskInfo));
if (code != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, code);
}
}
doFilter(pInfo->pCondition, pInfo->pRes);
blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex);
if (pBlockInfo->rows > 0) {
return 0;
}
return 0;
}
static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
// NOTE: this operator does never check if current status is done or not // NOTE: this operator does never check if current status is done or not
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SStreamScanInfo* pInfo = pOperator->info; SStreamScanInfo* pInfo = pOperator->info;
pTaskInfo->code = pOperator->fpSet._openFn(pOperator); /*pTaskInfo->code = pOperator->fpSet._openFn(pOperator);*/
if (pTaskInfo->code != TSDB_CODE_SUCCESS || pOperator->status == OP_EXEC_DONE) { /*if (pTaskInfo->code != TSDB_CODE_SUCCESS || pOperator->status == OP_EXEC_DONE) {*/
/*return NULL;*/
/*}*/
if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__LOG) {
while (1) {
SFetchRet ret = {0};
tqNextBlock(pInfo->tqReader, &ret);
if (ret.fetchType == FETCH_TYPE__DATA) {
blockDataCleanup(pInfo->pRes);
if (setBlockIntoRes(pInfo, &ret.data) < 0) {
ASSERT(0);
}
/*pTaskInfo->streamInfo.lastStatus = ret.offset;*/
if (pInfo->pRes->info.rows > 0) {
return pInfo->pRes;
/*} else {*/
/*tDeleteSSDataBlock(&ret.data);*/
}
} else if (ret.fetchType == FETCH_TYPE__META) {
ASSERT(0);
pTaskInfo->streamInfo.lastStatus = ret.offset;
pTaskInfo->streamInfo.metaBlk = ret.meta;
return NULL; return NULL;
} else if (ret.fetchType == FETCH_TYPE__NONE) {
if (ret.offset.version == -1) {
pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__LOG;
pTaskInfo->streamInfo.lastStatus.version = pTaskInfo->streamInfo.prepareStatus.version - 1;
} else {
pTaskInfo->streamInfo.lastStatus = ret.offset;
}
return NULL;
} else {
ASSERT(0);
}
}
} }
size_t total = taosArrayGetSize(pInfo->pBlockLists); size_t total = taosArrayGetSize(pInfo->pBlockLists);
@ -1146,7 +1282,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
if (pInfo->blockType == STREAM_INPUT__DATA_BLOCK) { if (pInfo->blockType == STREAM_INPUT__DATA_BLOCK) {
if (pInfo->validBlockIndex >= total) { if (pInfo->validBlockIndex >= total) {
/*doClearBufferedBlocks(pInfo);*/ /*doClearBufferedBlocks(pInfo);*/
pOperator->status = OP_EXEC_DONE; /*pOperator->status = OP_EXEC_DONE;*/
return NULL; return NULL;
} }
@ -1255,8 +1391,6 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
pInfo->pRes->info.type = STREAM_NORMAL; pInfo->pRes->info.type = STREAM_NORMAL;
pInfo->pRes->info.capacity = block.info.rows; pInfo->pRes->info.capacity = block.info.rows;
uint64_t* groupIdPre = taosHashGet(pOperator->pTaskInfo->tableqinfoList.map, &block.info.uid, sizeof(int64_t)); uint64_t* groupIdPre = taosHashGet(pOperator->pTaskInfo->tableqinfoList.map, &block.info.uid, sizeof(int64_t));
if (groupIdPre) { if (groupIdPre) {
pInfo->pRes->info.groupId = *groupIdPre; pInfo->pRes->info.groupId = *groupIdPre;
@ -1295,6 +1429,9 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
} }
taosArrayDestroy(block.pDataBlock); taosArrayDestroy(block.pDataBlock);
ASSERT(pInfo->pRes->pDataBlock != NULL);
#if 0
if (pInfo->pRes->pDataBlock == NULL) { if (pInfo->pRes->pDataBlock == NULL) {
// TODO add log // TODO add log
updateInfoDestoryColseWinSBF(pInfo->pUpdateInfo); updateInfoDestoryColseWinSBF(pInfo->pUpdateInfo);
@ -1302,10 +1439,14 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
pTaskInfo->code = terrno; pTaskInfo->code = terrno;
return NULL; return NULL;
} }
#endif
// currently only the tbname pseudo column // currently only the tbname pseudo column
if (pInfo->numOfPseudoExpr > 0) { if (pInfo->numOfPseudoExpr > 0) {
addTagPseudoColumnData(&pInfo->readHandle, pInfo->pPseudoExpr, pInfo->numOfPseudoExpr, pInfo->pRes); code = addTagPseudoColumnData(&pInfo->readHandle, pInfo->pPseudoExpr, pInfo->numOfPseudoExpr, pInfo->pRes, GET_TASKID(pTaskInfo));
if (code != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, code);
}
} }
doFilter(pInfo->pCondition, pInfo->pRes); doFilter(pInfo->pCondition, pInfo->pRes);
@ -1321,7 +1462,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
if (pBlockInfo->rows == 0) { if (pBlockInfo->rows == 0) {
updateInfoDestoryColseWinSBF(pInfo->pUpdateInfo); updateInfoDestoryColseWinSBF(pInfo->pUpdateInfo);
pOperator->status = OP_EXEC_DONE; /*pOperator->status = OP_EXEC_DONE;*/
} else if (pInfo->pUpdateInfo) { } else if (pInfo->pUpdateInfo) {
pInfo->tsArrayIndex = 0; pInfo->tsArrayIndex = 0;
checkUpdateData(pInfo, true, pInfo->pRes, true); checkUpdateData(pInfo, true, pInfo->pRes, true);
@ -1339,7 +1480,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
return (pBlockInfo->rows == 0) ? NULL : pInfo->pRes; return (pBlockInfo->rows == 0) ? NULL : pInfo->pRes;
} else if (pInfo->blockType == STREAM_INPUT__DATA_SCAN) { } else if (pInfo->blockType == STREAM_INPUT__TABLE_SCAN) {
// check reader last status // check reader last status
// if not match, reset status // if not match, reset status
SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp); SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp);
@ -1730,7 +1871,16 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) {
SMetaReader mr = {0}; SMetaReader mr = {0};
metaReaderInit(&mr, pInfo->readHandle.meta, 0); metaReaderInit(&mr, pInfo->readHandle.meta, 0);
metaGetTableEntryByUid(&mr, pInfo->pCur->mr.me.ctbEntry.suid);
uint64_t suid = pInfo->pCur->mr.me.ctbEntry.suid;
int32_t code = metaGetTableEntryByUid(&mr, suid);
if (code != TSDB_CODE_SUCCESS) {
qError("failed to get super table meta, uid:0x%"PRIx64 ", code:%s, %s", suid, tstrerror(terrno), GET_TASKID(pTaskInfo));
metaReaderClear(&mr);
metaCloseTbCursor(pInfo->pCur);
pInfo->pCur = NULL;
longjmp(pTaskInfo->env, terrno);
}
// number of columns // number of columns
pColInfoData = taosArrayGet(p->pDataBlock, 3); pColInfoData = taosArrayGet(p->pDataBlock, 3);
@ -2125,7 +2275,12 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) {
while (pInfo->curPos < size && count < pOperator->resultInfo.capacity) { while (pInfo->curPos < size && count < pOperator->resultInfo.capacity) {
STableKeyInfo* item = taosArrayGet(pInfo->pTableList->pTableList, pInfo->curPos); STableKeyInfo* item = taosArrayGet(pInfo->pTableList->pTableList, pInfo->curPos);
metaGetTableEntryByUid(&mr, item->uid); int32_t code = metaGetTableEntryByUid(&mr, item->uid);
if (code != TSDB_CODE_SUCCESS) {
qError("failed to get table meta, uid:0x%"PRIx64 ", code:%s, %s", item->uid, tstrerror(terrno), GET_TASKID(pTaskInfo));
metaReaderClear(&mr);
longjmp(pTaskInfo->env, terrno);
}
for (int32_t j = 0; j < pOperator->exprSupp.numOfExprs; ++j) { for (int32_t j = 0; j < pOperator->exprSupp.numOfExprs; ++j) {
SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, pExprInfo[j].base.resSchema.slotId); SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, pExprInfo[j].base.resSchema.slotId);
@ -2410,8 +2565,11 @@ static int32_t loadDataBlockFromOneTable(SOperatorInfo* pOperator, STableMergeSc
// currently only the tbname pseudo column // currently only the tbname pseudo column
if (pTableScanInfo->numOfPseudoExpr > 0) { if (pTableScanInfo->numOfPseudoExpr > 0) {
addTagPseudoColumnData(&pTableScanInfo->readHandle, pTableScanInfo->pPseudoExpr, pTableScanInfo->numOfPseudoExpr, int32_t code = addTagPseudoColumnData(&pTableScanInfo->readHandle, pTableScanInfo->pPseudoExpr, pTableScanInfo->numOfPseudoExpr,
pBlock); pBlock, GET_TASKID(pTaskInfo));
if (code != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, code);
}
} }
int64_t st = taosGetTimestampMs(); int64_t st = taosGetTimestampMs();

View File

@ -57,7 +57,7 @@ typedef struct SScalarCtx {
#define SCL_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { terrno = code; goto _return; } } while (0) #define SCL_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { terrno = code; goto _return; } } while (0)
int32_t doConvertDataType(SValueNode* pValueNode, SScalarParam* out); int32_t doConvertDataType(SValueNode* pValueNode, SScalarParam* out);
SColumnInfoData* sclCreateColumnInfoData(SDataType* pType, int32_t numOfRows); int32_t sclCreateColumnInfoData(SDataType* pType, int32_t numOfRows, SScalarParam* pParam);
int32_t sclConvertToTsValueNode(int8_t precision, SValueNode* valueNode); int32_t sclConvertToTsValueNode(int8_t precision, SValueNode* valueNode);
#define GET_PARAM_TYPE(_c) ((_c)->columnData ? (_c)->columnData->info.type : (_c)->hashValueType) #define GET_PARAM_TYPE(_c) ((_c)->columnData ? (_c)->columnData->info.type : (_c)->hashValueType)

View File

@ -3827,13 +3827,20 @@ bool filterExecute(SFilterInfo *info, SSDataBlock *pSrc, int8_t** p, SColumnData
SScalarParam output = {0}; SScalarParam output = {0};
SDataType type = {.type = TSDB_DATA_TYPE_BOOL, .bytes = sizeof(bool)}; SDataType type = {.type = TSDB_DATA_TYPE_BOOL, .bytes = sizeof(bool)};
output.columnData = sclCreateColumnInfoData(&type, pSrc->info.rows); int32_t code = sclCreateColumnInfoData(&type, pSrc->info.rows, &output);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
SArray *pList = taosArrayInit(1, POINTER_BYTES); SArray *pList = taosArrayInit(1, POINTER_BYTES);
taosArrayPush(pList, &pSrc); taosArrayPush(pList, &pSrc);
FLT_ERR_RET(scalarCalculate(info->sclCtx.node, pList, &output)); FLT_ERR_RET(scalarCalculate(info->sclCtx.node, pList, &output));
*p = (int8_t *)output.columnData->pData; *p = taosMemoryMalloc(output.numOfRows * sizeof(bool));
memcpy(*p, output.columnData->pData, output.numOfRows);
colDataDestroy(output.columnData);
taosMemoryFree(output.columnData);
taosArrayDestroy(pList); taosArrayDestroy(pList);
return false; return false;

View File

@ -35,12 +35,11 @@ int32_t sclConvertToTsValueNode(int8_t precision, SValueNode* valueNode) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t sclCreateColumnInfoData(SDataType* pType, int32_t numOfRows, SScalarParam* pParam) {
SColumnInfoData* sclCreateColumnInfoData(SDataType* pType, int32_t numOfRows) {
SColumnInfoData* pColumnData = taosMemoryCalloc(1, sizeof(SColumnInfoData)); SColumnInfoData* pColumnData = taosMemoryCalloc(1, sizeof(SColumnInfoData));
if (pColumnData == NULL) { if (pColumnData == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL; return terrno;
} }
pColumnData->info.type = pType->type; pColumnData->info.type = pType->type;
@ -52,19 +51,25 @@ SColumnInfoData* sclCreateColumnInfoData(SDataType* pType, int32_t numOfRows) {
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
taosMemoryFree(pColumnData); taosMemoryFree(pColumnData);
return NULL; return terrno;
} else {
return pColumnData;
} }
pParam->columnData = pColumnData;
pParam->type = SHOULD_FREE_COLDATA;
return TSDB_CODE_SUCCESS;
} }
int32_t doConvertDataType(SValueNode* pValueNode, SScalarParam* out) { int32_t doConvertDataType(SValueNode* pValueNode, SScalarParam* out) {
SScalarParam in = {.numOfRows = 1}; SScalarParam in = {.numOfRows = 1};
in.columnData = sclCreateColumnInfoData(&pValueNode->node.resType, 1); int32_t code = sclCreateColumnInfoData(&pValueNode->node.resType, 1, &in);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
colDataAppend(in.columnData, 0, nodesGetValueFromNode(pValueNode), false); colDataAppend(in.columnData, 0, nodesGetValueFromNode(pValueNode), false);
colInfoDataEnsureCapacity(out->columnData, 1); colInfoDataEnsureCapacity(out->columnData, 1);
int32_t code = vectorConvertImpl(&in, out); code = vectorConvertImpl(&in, out);
sclFreeParam(&in); sclFreeParam(&in);
return code; return code;
@ -157,7 +162,7 @@ void sclFreeRes(SHashObj *res) {
void sclFreeParam(SScalarParam *param) { void sclFreeParam(SScalarParam *param) {
if (param->columnData != NULL) { if (param->columnData != NULL) {
colDataDestroy(param->columnData); colDataDestroy(param->columnData);
taosMemoryFree(param->columnData); taosMemoryFreeClear(param->columnData);
} }
if (param->pHashFilter != NULL) { if (param->pHashFilter != NULL) {
@ -190,8 +195,9 @@ int32_t sclInitParam(SNode* node, SScalarParam *param, SScalarCtx *ctx, int32_t
case QUERY_NODE_VALUE: { case QUERY_NODE_VALUE: {
SValueNode *valueNode = (SValueNode *)node; SValueNode *valueNode = (SValueNode *)node;
ASSERT(param->columnData == NULL);
param->numOfRows = 1; param->numOfRows = 1;
param->columnData = sclCreateColumnInfoData(&valueNode->node.resType, 1); /*int32_t code = */sclCreateColumnInfoData(&valueNode->node.resType, 1, param);
if (TSDB_DATA_TYPE_NULL == valueNode->node.resType.type || valueNode->isNull) { if (TSDB_DATA_TYPE_NULL == valueNode->node.resType.type || valueNode->isNull) {
colDataAppendNULL(param->columnData, 0); colDataAppendNULL(param->columnData, 0);
} else { } else {
@ -429,10 +435,9 @@ int32_t sclExecFunction(SFunctionNode *node, SScalarCtx *ctx, SScalarParam *outp
SCL_ERR_JRET(code); SCL_ERR_JRET(code);
} }
output->columnData = sclCreateColumnInfoData(&node->node.resType, rowNum); code = sclCreateColumnInfoData(&node->node.resType, rowNum, output);
if (output->columnData == NULL) { if (code != TSDB_CODE_SUCCESS) {
sclError("calloc %d failed", (int32_t)(rowNum * output->columnData->info.bytes)); SCL_ERR_JRET(code);
SCL_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
} }
code = (*ffpSet.process)(params, paramNum, output); code = (*ffpSet.process)(params, paramNum, output);
@ -482,10 +487,9 @@ int32_t sclExecLogic(SLogicConditionNode *node, SScalarCtx *ctx, SScalarParam *o
output->numOfRows = rowNum; output->numOfRows = rowNum;
SDataType t = {.type = type, .bytes = tDataTypes[type].bytes}; SDataType t = {.type = type, .bytes = tDataTypes[type].bytes};
output->columnData = sclCreateColumnInfoData(&t, rowNum); code = sclCreateColumnInfoData(&t, rowNum, output);
if (output->columnData == NULL) { if (code != TSDB_CODE_SUCCESS) {
sclError("calloc %d failed", (int32_t)(rowNum * sizeof(bool))); SCL_ERR_JRET(code);
SCL_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
} }
bool value = false; bool value = false;
@ -537,18 +541,19 @@ int32_t sclExecOperator(SOperatorNode *node, SScalarCtx *ctx, SScalarParam *outp
int32_t code = 0; int32_t code = 0;
// json not support in in operator // json not support in in operator
if(nodeType(node->pLeft) == QUERY_NODE_VALUE){ if (nodeType(node->pLeft) == QUERY_NODE_VALUE) {
SValueNode *valueNode = (SValueNode *)node->pLeft; SValueNode *valueNode = (SValueNode *)node->pLeft;
if(valueNode->node.resType.type == TSDB_DATA_TYPE_JSON && (node->opType == OP_TYPE_IN || node->opType == OP_TYPE_NOT_IN)){ if (valueNode->node.resType.type == TSDB_DATA_TYPE_JSON && (node->opType == OP_TYPE_IN || node->opType == OP_TYPE_NOT_IN)) {
SCL_RET(TSDB_CODE_QRY_JSON_IN_ERROR); SCL_RET(TSDB_CODE_QRY_JSON_IN_ERROR);
} }
} }
SCL_ERR_RET(sclInitOperatorParams(&params, node, ctx, &rowNum)); SCL_ERR_RET(sclInitOperatorParams(&params, node, ctx, &rowNum));
output->columnData = sclCreateColumnInfoData(&node->node.resType, rowNum);
if (output->columnData == NULL) { if (output->columnData == NULL) {
sclError("calloc failed, size:%d", (int32_t)rowNum * node->node.resType.bytes); code = sclCreateColumnInfoData(&node->node.resType, rowNum, output);
SCL_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); if (code != TSDB_CODE_SUCCESS) {
SCL_ERR_JRET(code);
}
} }
_bin_scalar_fn_t OperatorFn = getBinScalarOperatorFn(node->opType); _bin_scalar_fn_t OperatorFn = getBinScalarOperatorFn(node->opType);
@ -563,7 +568,10 @@ int32_t sclExecOperator(SOperatorNode *node, SScalarCtx *ctx, SScalarParam *outp
_return: _return:
for (int32_t i = 0; i < paramNum; ++i) { for (int32_t i = 0; i < paramNum; ++i) {
// sclFreeParam(&params[i]); if (params[i].type == SHOULD_FREE_COLDATA) {
colDataDestroy(params[i].columnData);
taosMemoryFreeClear(params[i].columnData);
}
} }
taosMemoryFreeClear(params); taosMemoryFreeClear(params);
@ -766,7 +774,7 @@ EDealRes sclRewriteOperator(SNode** pNode, SScalarCtx *ctx) {
return sclRewriteNonConstOperator(pNode, ctx); return sclRewriteNonConstOperator(pNode, ctx);
} }
SScalarParam output = {.columnData = taosMemoryCalloc(1, sizeof(SColumnInfoData))}; SScalarParam output = {0};
ctx->code = sclExecOperator(node, ctx, &output); ctx->code = sclExecOperator(node, ctx, &output);
if (ctx->code) { if (ctx->code) {
return DEAL_RES_ERROR; return DEAL_RES_ERROR;
@ -834,6 +842,7 @@ EDealRes sclWalkFunction(SNode* pNode, SScalarCtx *ctx) {
return DEAL_RES_ERROR; return DEAL_RES_ERROR;
} }
output.type = DELEGATED_MGMT_COLDATA;
if (taosHashPut(ctx->pRes, &pNode, POINTER_BYTES, &output, sizeof(output))) { if (taosHashPut(ctx->pRes, &pNode, POINTER_BYTES, &output, sizeof(output))) {
ctx->code = TSDB_CODE_QRY_OUT_OF_MEMORY; ctx->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
return DEAL_RES_ERROR; return DEAL_RES_ERROR;
@ -868,6 +877,7 @@ EDealRes sclWalkOperator(SNode* pNode, SScalarCtx *ctx) {
return DEAL_RES_ERROR; return DEAL_RES_ERROR;
} }
output.type = DELEGATED_MGMT_COLDATA;
if (taosHashPut(ctx->pRes, &pNode, POINTER_BYTES, &output, sizeof(output))) { if (taosHashPut(ctx->pRes, &pNode, POINTER_BYTES, &output, sizeof(output))) {
ctx->code = TSDB_CODE_QRY_OUT_OF_MEMORY; ctx->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
return DEAL_RES_ERROR; return DEAL_RES_ERROR;
@ -1027,6 +1037,7 @@ int32_t scalarCalculate(SNode *pNode, SArray *pBlockList, SScalarParam *pDst) {
pDst->numOfRows = res->numOfRows; pDst->numOfRows = res->numOfRows;
} }
sclFreeParam(res);
taosHashRemove(ctx.pRes, (void *)&pNode, POINTER_BYTES); taosHashRemove(ctx.pRes, (void *)&pNode, POINTER_BYTES);
} }

View File

@ -865,12 +865,11 @@ int32_t vectorGetConvertType(int32_t type1, int32_t type2) {
} }
int32_t vectorConvertScalarParam(SScalarParam *input, SScalarParam *output, int32_t type) { int32_t vectorConvertScalarParam(SScalarParam *input, SScalarParam *output, int32_t type) {
int32_t code = 0;
SDataType t = {.type = type, .bytes = tDataTypes[type].bytes}; SDataType t = {.type = type, .bytes = tDataTypes[type].bytes};
output->numOfRows = input->numOfRows; output->numOfRows = input->numOfRows;
output->columnData = sclCreateColumnInfoData(&t, input->numOfRows); int32_t code = sclCreateColumnInfoData(&t, input->numOfRows, output);
if (output->columnData == NULL) { if (code != TSDB_CODE_SUCCESS) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
@ -940,13 +939,12 @@ static int32_t doConvertHelper(SScalarParam* pDest, int32_t* convert, const SSca
pDest->numOfRows = pParam->numOfRows; pDest->numOfRows = pParam->numOfRows;
SDataType t = {.type = type, .bytes = tDataTypes[type].bytes}; SDataType t = {.type = type, .bytes = tDataTypes[type].bytes};
pDest->columnData = sclCreateColumnInfoData(&t, pParam->numOfRows); int32_t code = sclCreateColumnInfoData(&t, pParam->numOfRows, pDest);
if (pDest->columnData == NULL) { if (code != TSDB_CODE_SUCCESS) {
sclError("malloc %d failed", (int32_t)(pParam->numOfRows * sizeof(double))); return code;
return TSDB_CODE_OUT_OF_MEMORY;
} }
int32_t code = vectorConvertImpl(pParam, pDest); code = vectorConvertImpl(pParam, pDest);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }

View File

@ -205,13 +205,19 @@ void flttMakeListNode(SNode **pNode, SNodeList *list, int32_t resType) {
*pNode = (SNode *)lnode; *pNode = (SNode *)lnode;
} }
void initScalarParam(SScalarParam* pParam) {
memset(pParam, 0, sizeof(SScalarParam));
pParam->type = SHOULD_FREE_COLDATA;
}
} }
TEST(timerangeTest, greater) { TEST(timerangeTest, greater) {
SNode *pcol = NULL, *pval = NULL, *opNode1 = NULL; SNode *pcol = NULL, *pval = NULL, *opNode1 = NULL;
bool eRes[5] = {false, false, true, true, true}; bool eRes[5] = {false, false, true, true, true};
SScalarParam res = {0}; SScalarParam res;
initScalarParam(&res);
int64_t tsmall = 222, tbig = 333; int64_t tsmall = 222, tbig = 333;
flttMakeColumnNode(&pcol, NULL, TSDB_DATA_TYPE_TIMESTAMP, sizeof(int64_t), 0, NULL); flttMakeColumnNode(&pcol, NULL, TSDB_DATA_TYPE_TIMESTAMP, sizeof(int64_t), 0, NULL);
flttMakeValueNode(&pval, TSDB_DATA_TYPE_TIMESTAMP, &tsmall); flttMakeValueNode(&pval, TSDB_DATA_TYPE_TIMESTAMP, &tsmall);
@ -234,7 +240,8 @@ TEST(timerangeTest, greater) {
TEST(timerangeTest, greater_and_lower) { TEST(timerangeTest, greater_and_lower) {
SNode *pcol = NULL, *pval = NULL, *opNode1 = NULL, *opNode2 = NULL, *logicNode = NULL; SNode *pcol = NULL, *pval = NULL, *opNode1 = NULL, *opNode2 = NULL, *logicNode = NULL;
bool eRes[5] = {false, false, true, true, true}; bool eRes[5] = {false, false, true, true, true};
SScalarParam res = {0}; SScalarParam res;
initScalarParam(&res);
int64_t tsmall = 222, tbig = 333; int64_t tsmall = 222, tbig = 333;
flttMakeColumnNode(&pcol, NULL, TSDB_DATA_TYPE_TIMESTAMP, sizeof(int64_t), 0, NULL); flttMakeColumnNode(&pcol, NULL, TSDB_DATA_TYPE_TIMESTAMP, sizeof(int64_t), 0, NULL);
flttMakeValueNode(&pval, TSDB_DATA_TYPE_TIMESTAMP, &tsmall); flttMakeValueNode(&pval, TSDB_DATA_TYPE_TIMESTAMP, &tsmall);
@ -265,7 +272,8 @@ TEST(timerangeTest, greater_and_lower) {
TEST(timerangeTest, greater_equal_and_lower_equal) { TEST(timerangeTest, greater_equal_and_lower_equal) {
SNode *pcol = NULL, *pval = NULL, *opNode1 = NULL, *opNode2 = NULL, *logicNode = NULL; SNode *pcol = NULL, *pval = NULL, *opNode1 = NULL, *opNode2 = NULL, *logicNode = NULL;
bool eRes[5] = {false, false, true, true, true}; bool eRes[5] = {false, false, true, true, true};
SScalarParam res = {0}; SScalarParam res;
initScalarParam(&res);
int64_t tsmall = 222, tbig = 333; int64_t tsmall = 222, tbig = 333;
flttMakeColumnNode(&pcol, NULL, TSDB_DATA_TYPE_TIMESTAMP, sizeof(int64_t), 0, NULL); flttMakeColumnNode(&pcol, NULL, TSDB_DATA_TYPE_TIMESTAMP, sizeof(int64_t), 0, NULL);
flttMakeValueNode(&pval, TSDB_DATA_TYPE_TIMESTAMP, &tsmall); flttMakeValueNode(&pval, TSDB_DATA_TYPE_TIMESTAMP, &tsmall);
@ -297,7 +305,8 @@ TEST(timerangeTest, greater_equal_and_lower_equal) {
TEST(timerangeTest, greater_and_lower_not_strict) { TEST(timerangeTest, greater_and_lower_not_strict) {
SNode *pcol = NULL, *pval = NULL, *opNode1 = NULL, *opNode2 = NULL, *logicNode1 = NULL, *logicNode2 = NULL; SNode *pcol = NULL, *pval = NULL, *opNode1 = NULL, *opNode2 = NULL, *logicNode1 = NULL, *logicNode2 = NULL;
bool eRes[5] = {false, false, true, true, true}; bool eRes[5] = {false, false, true, true, true};
SScalarParam res = {0}; SScalarParam res;
initScalarParam(&res);
int64_t tsmall1 = 222, tbig1 = 333; int64_t tsmall1 = 222, tbig1 = 333;
int64_t tsmall2 = 444, tbig2 = 555; int64_t tsmall2 = 444, tbig2 = 555;
SNode *list[2] = {0}; SNode *list[2] = {0};
@ -350,7 +359,8 @@ TEST(columnTest, smallint_column_greater_double_value) {
double rightv= 2.5; double rightv= 2.5;
int8_t eRes[5] = {0, 0, 1, 1, 1}; int8_t eRes[5] = {0, 0, 1, 1, 1};
SSDataBlock *src = NULL; SSDataBlock *src = NULL;
SScalarParam res = {0}; SScalarParam res;
initScalarParam(&res);
int32_t rowNum = sizeof(leftv)/sizeof(leftv[0]); int32_t rowNum = sizeof(leftv)/sizeof(leftv[0]);
flttMakeColumnNode(&pLeft, &src, TSDB_DATA_TYPE_SMALLINT, sizeof(int16_t), rowNum, leftv); flttMakeColumnNode(&pLeft, &src, TSDB_DATA_TYPE_SMALLINT, sizeof(int16_t), rowNum, leftv);
flttMakeValueNode(&pRight, TSDB_DATA_TYPE_DOUBLE, &rightv); flttMakeValueNode(&pRight, TSDB_DATA_TYPE_DOUBLE, &rightv);
@ -405,7 +415,8 @@ TEST(columnTest, int_column_greater_smallint_value) {
int16_t rightv= 4; int16_t rightv= 4;
int8_t eRes[5] = {0, 0, 1, 1, 1}; int8_t eRes[5] = {0, 0, 1, 1, 1};
SSDataBlock *src = NULL; SSDataBlock *src = NULL;
SScalarParam res = {0}; SScalarParam res;
initScalarParam(&res);
int32_t rowNum = sizeof(leftv)/sizeof(leftv[0]); int32_t rowNum = sizeof(leftv)/sizeof(leftv[0]);
flttMakeColumnNode(&pLeft, &src, TSDB_DATA_TYPE_INT, sizeof(int32_t), rowNum, leftv); flttMakeColumnNode(&pLeft, &src, TSDB_DATA_TYPE_INT, sizeof(int32_t), rowNum, leftv);
flttMakeValueNode(&pRight, TSDB_DATA_TYPE_SMALLINT, &rightv); flttMakeValueNode(&pRight, TSDB_DATA_TYPE_SMALLINT, &rightv);
@ -460,7 +471,8 @@ TEST(columnTest, int_column_in_double_list) {
double rightv1 = 1.1,rightv2 = 2.2,rightv3 = 3.3; double rightv1 = 1.1,rightv2 = 2.2,rightv3 = 3.3;
bool eRes[5] = {true, true, true, false, false}; bool eRes[5] = {true, true, true, false, false};
SSDataBlock *src = NULL; SSDataBlock *src = NULL;
SScalarParam res = {0}; SScalarParam res;
initScalarParam(&res);
int32_t rowNum = sizeof(leftv)/sizeof(leftv[0]); int32_t rowNum = sizeof(leftv)/sizeof(leftv[0]);
flttMakeColumnNode(&pLeft, &src, TSDB_DATA_TYPE_INT, sizeof(int32_t), rowNum, leftv); flttMakeColumnNode(&pLeft, &src, TSDB_DATA_TYPE_INT, sizeof(int32_t), rowNum, leftv);
SNodeList* list = nodesMakeList(); SNodeList* list = nodesMakeList();
@ -503,7 +515,8 @@ TEST(columnTest, binary_column_in_binary_list) {
SNode *pLeft = NULL, *pRight = NULL, *listNode = NULL, *opNode = NULL; SNode *pLeft = NULL, *pRight = NULL, *listNode = NULL, *opNode = NULL;
bool eRes[5] = {true, true, false, false, false}; bool eRes[5] = {true, true, false, false, false};
SSDataBlock *src = NULL; SSDataBlock *src = NULL;
SScalarParam res = {0}; SScalarParam res;
initScalarParam(&res);
char leftv[5][5]= {0}; char leftv[5][5]= {0};
char rightv[3][5]= {0}; char rightv[3][5]= {0};
for (int32_t i = 0; i < 5; ++i) { for (int32_t i = 0; i < 5; ++i) {
@ -567,7 +580,8 @@ TEST(columnTest, binary_column_like_binary) {
char rightv[64] = {0}; char rightv[64] = {0};
char leftv[5][5]= {0}; char leftv[5][5]= {0};
SSDataBlock *src = NULL; SSDataBlock *src = NULL;
SScalarParam res = {0}; SScalarParam res;
initScalarParam(&res);
bool eRes[5] = {true, false, true, false, true}; bool eRes[5] = {true, false, true, false, true};
for (int32_t i = 0; i < 5; ++i) { for (int32_t i = 0; i < 5; ++i) {
@ -614,7 +628,8 @@ TEST(columnTest, binary_column_is_null) {
SNode *pLeft = NULL, *opNode = NULL; SNode *pLeft = NULL, *opNode = NULL;
char leftv[5][5]= {0}; char leftv[5][5]= {0};
SSDataBlock *src = NULL; SSDataBlock *src = NULL;
SScalarParam res = {0}; SScalarParam res;
initScalarParam(&res);
bool eRes[5] = {false, false, true, false, true}; bool eRes[5] = {false, false, true, false, true};
for (int32_t i = 0; i < 5; ++i) { for (int32_t i = 0; i < 5; ++i) {
@ -661,7 +676,8 @@ TEST(columnTest, binary_column_is_not_null) {
SNode *pLeft = NULL, *opNode = NULL; SNode *pLeft = NULL, *opNode = NULL;
char leftv[5][5]= {0}; char leftv[5][5]= {0};
SSDataBlock *src = NULL; SSDataBlock *src = NULL;
SScalarParam res = {0}; SScalarParam res;
initScalarParam(&res);
bool eRes[5] = {true, true, true, true, false}; bool eRes[5] = {true, true, true, true, false};
for (int32_t i = 0; i < 5; ++i) { for (int32_t i = 0; i < 5; ++i) {
@ -710,7 +726,8 @@ TEST(opTest, smallint_column_greater_int_column) {
int32_t rightv[5]= {0, -5, -4, 23, 100}; int32_t rightv[5]= {0, -5, -4, 23, 100};
bool eRes[5] = {true, false, true, false, true}; bool eRes[5] = {true, false, true, false, true};
SSDataBlock *src = NULL; SSDataBlock *src = NULL;
SScalarParam res = {0}; SScalarParam res;
initScalarParam(&res);
int32_t rowNum = sizeof(rightv)/sizeof(rightv[0]); int32_t rowNum = sizeof(rightv)/sizeof(rightv[0]);
flttMakeColumnNode(&pLeft, &src, TSDB_DATA_TYPE_SMALLINT, sizeof(int16_t), rowNum, leftv); flttMakeColumnNode(&pLeft, &src, TSDB_DATA_TYPE_SMALLINT, sizeof(int16_t), rowNum, leftv);
flttMakeColumnNode(&pRight, &src, TSDB_DATA_TYPE_INT, sizeof(int32_t), rowNum, rightv); flttMakeColumnNode(&pRight, &src, TSDB_DATA_TYPE_INT, sizeof(int32_t), rowNum, rightv);
@ -747,7 +764,8 @@ TEST(opTest, smallint_value_add_int_column) {
int16_t rightv[5]= {0, -1, -4, -1, 100}; int16_t rightv[5]= {0, -1, -4, -1, 100};
bool eRes[5] = {true, false, true, false, true}; bool eRes[5] = {true, false, true, false, true};
SSDataBlock *src = NULL; SSDataBlock *src = NULL;
SScalarParam res = {0}; SScalarParam res;
initScalarParam(&res);
int32_t rowNum = sizeof(rightv)/sizeof(rightv[0]); int32_t rowNum = sizeof(rightv)/sizeof(rightv[0]);
flttMakeValueNode(&pLeft, TSDB_DATA_TYPE_INT, &leftv); flttMakeValueNode(&pLeft, TSDB_DATA_TYPE_INT, &leftv);
flttMakeColumnNode(&pRight, &src, TSDB_DATA_TYPE_SMALLINT, sizeof(int16_t), rowNum, rightv); flttMakeColumnNode(&pRight, &src, TSDB_DATA_TYPE_SMALLINT, sizeof(int16_t), rowNum, rightv);
@ -790,7 +808,8 @@ TEST(opTest, bigint_column_multi_binary_column) {
} }
bool eRes[5] = {false, true, true, true, true}; bool eRes[5] = {false, true, true, true, true};
SSDataBlock *src = NULL; SSDataBlock *src = NULL;
SScalarParam res = {0}; SScalarParam res;
initScalarParam(&res);
int32_t rowNum = sizeof(rightv)/sizeof(rightv[0]); int32_t rowNum = sizeof(rightv)/sizeof(rightv[0]);
flttMakeColumnNode(&pLeft, &src, TSDB_DATA_TYPE_BIGINT, sizeof(int64_t), rowNum, leftv); flttMakeColumnNode(&pLeft, &src, TSDB_DATA_TYPE_BIGINT, sizeof(int64_t), rowNum, leftv);
flttMakeColumnNode(&pRight, &src, TSDB_DATA_TYPE_BINARY, 5, rowNum, rightv); flttMakeColumnNode(&pRight, &src, TSDB_DATA_TYPE_BINARY, 5, rowNum, rightv);
@ -833,7 +852,8 @@ TEST(opTest, smallint_column_and_binary_column) {
} }
bool eRes[5] = {false, false, true, false, true}; bool eRes[5] = {false, false, true, false, true};
SSDataBlock *src = NULL; SSDataBlock *src = NULL;
SScalarParam res = {0}; SScalarParam res;
initScalarParam(&res);
int32_t rowNum = sizeof(rightv)/sizeof(rightv[0]); int32_t rowNum = sizeof(rightv)/sizeof(rightv[0]);
flttMakeColumnNode(&pLeft, &src, TSDB_DATA_TYPE_SMALLINT, sizeof(int16_t), rowNum, leftv); flttMakeColumnNode(&pLeft, &src, TSDB_DATA_TYPE_SMALLINT, sizeof(int16_t), rowNum, leftv);
flttMakeColumnNode(&pRight, &src, TSDB_DATA_TYPE_BINARY, 5, rowNum, rightv); flttMakeColumnNode(&pRight, &src, TSDB_DATA_TYPE_BINARY, 5, rowNum, rightv);
@ -871,7 +891,8 @@ TEST(opTest, smallint_column_or_float_column) {
float rightv[5]= {2.0, 3.0, 0, 5.2, 6.0}; float rightv[5]= {2.0, 3.0, 0, 5.2, 6.0};
bool eRes[5] = {true, true, false, true, true}; bool eRes[5] = {true, true, false, true, true};
SSDataBlock *src = NULL; SSDataBlock *src = NULL;
SScalarParam res = {0}; SScalarParam res;
initScalarParam(&res);
int32_t rowNum = sizeof(rightv)/sizeof(rightv[0]); int32_t rowNum = sizeof(rightv)/sizeof(rightv[0]);
flttMakeColumnNode(&pLeft, &src, TSDB_DATA_TYPE_SMALLINT, sizeof(int16_t), rowNum, leftv); flttMakeColumnNode(&pLeft, &src, TSDB_DATA_TYPE_SMALLINT, sizeof(int16_t), rowNum, leftv);
flttMakeColumnNode(&pRight, &src, TSDB_DATA_TYPE_FLOAT, sizeof(float), rowNum, rightv); flttMakeColumnNode(&pRight, &src, TSDB_DATA_TYPE_FLOAT, sizeof(float), rowNum, rightv);
@ -909,7 +930,8 @@ TEST(opTest, smallint_column_or_double_value) {
double rightv= 10.2; double rightv= 10.2;
bool eRes[5] = {true, true, true, true, true}; bool eRes[5] = {true, true, true, true, true};
SSDataBlock *src = NULL; SSDataBlock *src = NULL;
SScalarParam res = {0}; SScalarParam res;
initScalarParam(&res);
int32_t rowNum = sizeof(leftv)/sizeof(leftv[0]); int32_t rowNum = sizeof(leftv)/sizeof(leftv[0]);
flttMakeColumnNode(&pLeft, &src, TSDB_DATA_TYPE_SMALLINT, sizeof(int16_t), rowNum, leftv); flttMakeColumnNode(&pLeft, &src, TSDB_DATA_TYPE_SMALLINT, sizeof(int16_t), rowNum, leftv);
flttMakeValueNode(&pRight, TSDB_DATA_TYPE_DOUBLE, &rightv); flttMakeValueNode(&pRight, TSDB_DATA_TYPE_DOUBLE, &rightv);
@ -945,7 +967,8 @@ TEST(opTest, binary_column_is_true) {
SNode *pLeft = NULL, *opNode = NULL; SNode *pLeft = NULL, *opNode = NULL;
char leftv[5][5]= {0}; char leftv[5][5]= {0};
SSDataBlock *src = NULL; SSDataBlock *src = NULL;
SScalarParam res = {0}; SScalarParam res;
initScalarParam(&res);
bool eRes[5] = {false, true, false, true, false}; bool eRes[5] = {false, true, false, true, false};
for (int32_t i = 0; i < 5; ++i) { for (int32_t i = 0; i < 5; ++i) {

View File

@ -150,9 +150,10 @@ static int32_t walReadChangeFile(SWalReader *pRead, int64_t fileFirstVer) {
return 0; return 0;
} }
static int32_t walReadSeekVer(SWalReader *pRead, int64_t ver) { int32_t walReadSeekVer(SWalReader *pRead, int64_t ver) {
SWal *pWal = pRead->pWal; SWal *pWal = pRead->pWal;
if (ver == pRead->curVersion) { if (ver == pRead->curVersion) {
wDebug("wal version %ld match, no need to reset", ver);
return 0; return 0;
} }
if (ver > pWal->vers.lastVer || ver < pWal->vers.firstVer) { if (ver > pWal->vers.lastVer || ver < pWal->vers.firstVer) {
@ -181,6 +182,8 @@ static int32_t walReadSeekVer(SWalReader *pRead, int64_t ver) {
return -1; return -1;
} }
wDebug("wal version reset from %ld to %ld", pRead->curVersion, ver);
pRead->curVersion = ver; pRead->curVersion = ver;
return 0; return 0;
@ -191,7 +194,10 @@ void walSetReaderCapacity(SWalReader *pRead, int32_t capacity) { pRead->capacity
static int32_t walFetchHeadNew(SWalReader *pRead, int64_t fetchVer) { static int32_t walFetchHeadNew(SWalReader *pRead, int64_t fetchVer) {
int64_t contLen; int64_t contLen;
if (pRead->curVersion != fetchVer) { if (pRead->curVersion != fetchVer) {
if (walReadSeekVer(pRead, fetchVer) < 0) return -1; if (walReadSeekVer(pRead, fetchVer) < 0) {
ASSERT(0);
return -1;
}
} }
contLen = taosReadFile(pRead->pLogFile, pRead->pHead, sizeof(SWalCkHead)); contLen = taosReadFile(pRead->pLogFile, pRead->pHead, sizeof(SWalCkHead));
if (contLen != sizeof(SWalCkHead)) { if (contLen != sizeof(SWalCkHead)) {
@ -200,6 +206,7 @@ static int32_t walFetchHeadNew(SWalReader *pRead, int64_t fetchVer) {
} else { } else {
terrno = TSDB_CODE_WAL_FILE_CORRUPTED; terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
} }
ASSERT(0);
pRead->curVersion = -1; pRead->curVersion = -1;
return -1; return -1;
} }
@ -254,6 +261,7 @@ static int32_t walFetchBodyNew(SWalReader *pRead) {
} }
pRead->curVersion = ver + 1; pRead->curVersion = ver + 1;
wDebug("version advance to %ld, fetch body", pRead->curVersion);
return 0; return 0;
} }
@ -266,10 +274,12 @@ static int32_t walSkipFetchBodyNew(SWalReader *pRead) {
if (code < 0) { if (code < 0) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
pRead->curVersion = -1; pRead->curVersion = -1;
ASSERT(0);
return -1; return -1;
} }
pRead->curVersion++; pRead->curVersion++;
wDebug("version advance to %ld, skip fetch", pRead->curVersion);
return 0; return 0;
} }
@ -362,22 +372,6 @@ int32_t walFetchBody(SWalReader *pRead, SWalCkHead **ppHead) {
return 0; return 0;
} }
int32_t walReadWithHandle_s(SWalReader *pRead, int64_t ver, SWalCont **ppHead) {
taosThreadMutexLock(&pRead->mutex);
if (walReadVer(pRead, ver) < 0) {
taosThreadMutexUnlock(&pRead->mutex);
return -1;
}
*ppHead = taosMemoryMalloc(sizeof(SWalCont) + pRead->pHead->head.bodyLen);
if (*ppHead == NULL) {
taosThreadMutexUnlock(&pRead->mutex);
return -1;
}
memcpy(*ppHead, &pRead->pHead->head, sizeof(SWalCont) + pRead->pHead->head.bodyLen);
taosThreadMutexUnlock(&pRead->mutex);
return 0;
}
int32_t walReadVer(SWalReader *pRead, int64_t ver) { int32_t walReadVer(SWalReader *pRead, int64_t ver) {
int64_t code; int64_t code;

View File

@ -80,7 +80,7 @@
./test.sh -f tsim/mnode/basic1.sim ./test.sh -f tsim/mnode/basic1.sim
./test.sh -f tsim/mnode/basic2.sim ./test.sh -f tsim/mnode/basic2.sim
./test.sh -f tsim/mnode/basic3.sim ./test.sh -f tsim/mnode/basic3.sim
./test.sh -f tsim/mnode/basic4.sim #./test.sh -f tsim/mnode/basic4.sim
./test.sh -f tsim/mnode/basic5.sim ./test.sh -f tsim/mnode/basic5.sim
# ---- show # ---- show

View File

@ -162,28 +162,28 @@ class TDTestCase:
keyDict['h'] = 'abc' keyDict['h'] = 'abc'
retCode, retVal = taos_command(buildPath, "h", keyDict['h'], "taos>", keyDict['c'], '') retCode, retVal = taos_command(buildPath, "h", keyDict['h'], "taos>", keyDict['c'], '')
if (retCode == "TAOS_FAIL") and ("Unable to establish connection" in retVal): if (retCode == "TAOS_FAIL") and ("Unable to establish connection" in retVal):
tdLog.info("taos -h %s test suceess"%keyDict['h']) tdLog.info("taos -h %s test success"%keyDict['h'])
else: else:
tdLog.exit("taos -h %s fail"%keyDict['h']) tdLog.exit("taos -h %s fail"%keyDict['h'])
keyDict['h'] = '\'abc\'' keyDict['h'] = '\'abc\''
retCode, retVal = taos_command(buildPath, "h", keyDict['h'], "taos>", keyDict['c'], '') retCode, retVal = taos_command(buildPath, "h", keyDict['h'], "taos>", keyDict['c'], '')
if (retCode == "TAOS_FAIL") and ("Unable to establish connection" in retVal): if (retCode == "TAOS_FAIL") and ("Unable to establish connection" in retVal):
tdLog.info("taos -h %s test suceess"%keyDict['h']) tdLog.info("taos -h %s test success"%keyDict['h'])
else: else:
tdLog.exit("taos -h %s fail"%keyDict['h']) tdLog.exit("taos -h %s fail"%keyDict['h'])
keyDict['h'] = '3' keyDict['h'] = '3'
retCode, retVal = taos_command(buildPath, "h", keyDict['h'], "taos>", keyDict['c'], '') retCode, retVal = taos_command(buildPath, "h", keyDict['h'], "taos>", keyDict['c'], '')
if (retCode == "TAOS_FAIL") and ("Unable to establish connection" in retVal): if (retCode == "TAOS_FAIL") and ("Unable to establish connection" in retVal):
tdLog.info("taos -h %s test suceess"%keyDict['h']) tdLog.info("taos -h %s test success"%keyDict['h'])
else: else:
tdLog.exit("taos -h %s fail"%keyDict['h']) tdLog.exit("taos -h %s fail"%keyDict['h'])
keyDict['h'] = '\'3\'' keyDict['h'] = '\'3\''
retCode, retVal = taos_command(buildPath, "h", keyDict['h'], "taos>", keyDict['c'], '') retCode, retVal = taos_command(buildPath, "h", keyDict['h'], "taos>", keyDict['c'], '')
if (retCode == "TAOS_FAIL") and ("Unable to establish connection" in retVal): if (retCode == "TAOS_FAIL") and ("Unable to establish connection" in retVal):
tdLog.info("taos -h %s test suceess"%keyDict['h']) tdLog.info("taos -h %s test success"%keyDict['h'])
else: else:
tdLog.exit("taos -h %s fail"%keyDict['h']) tdLog.exit("taos -h %s fail"%keyDict['h'])
@ -193,42 +193,42 @@ class TDTestCase:
keyDict['P'] = 'abc' keyDict['P'] = 'abc'
retCode, retVal = taos_command(buildPath, "P", keyDict['P'], "taos>", keyDict['c'], '') retCode, retVal = taos_command(buildPath, "P", keyDict['P'], "taos>", keyDict['c'], '')
if (retCode == "TAOS_FAIL") and ("Invalid port" in retVal): if (retCode == "TAOS_FAIL") and ("Invalid port" in retVal):
tdLog.info("taos -P %s test suceess"%keyDict['P']) tdLog.info("taos -P %s test success"%keyDict['P'])
else: else:
tdLog.exit("taos -P %s fail"%keyDict['P']) tdLog.exit("taos -P %s fail"%keyDict['P'])
keyDict['P'] = '\'abc\'' keyDict['P'] = '\'abc\''
retCode, retVal = taos_command(buildPath, "P", keyDict['P'], "taos>", keyDict['c'], '') retCode, retVal = taos_command(buildPath, "P", keyDict['P'], "taos>", keyDict['c'], '')
if (retCode == "TAOS_FAIL") and ("Invalid port" in retVal): if (retCode == "TAOS_FAIL") and ("Invalid port" in retVal):
tdLog.info("taos -P %s test suceess"%keyDict['P']) tdLog.info("taos -P %s test success"%keyDict['P'])
else: else:
tdLog.exit("taos -P %s fail"%keyDict['P']) tdLog.exit("taos -P %s fail"%keyDict['P'])
keyDict['P'] = '3' keyDict['P'] = '3'
retCode, retVal = taos_command(buildPath, "P", keyDict['P'], "taos>", keyDict['c'], '') retCode, retVal = taos_command(buildPath, "P", keyDict['P'], "taos>", keyDict['c'], '')
if (retCode == "TAOS_FAIL") and ("Unable to establish connection" in retVal): if (retCode == "TAOS_FAIL") and ("Unable to establish connection" in retVal):
tdLog.info("taos -P %s test suceess"%keyDict['P']) tdLog.info("taos -P %s test success"%keyDict['P'])
else: else:
tdLog.exit("taos -P %s fail"%keyDict['P']) tdLog.exit("taos -P %s fail"%keyDict['P'])
keyDict['P'] = '\'3\'' keyDict['P'] = '\'3\''
retCode, retVal = taos_command(buildPath, "P", keyDict['P'], "taos>", keyDict['c'], '') retCode, retVal = taos_command(buildPath, "P", keyDict['P'], "taos>", keyDict['c'], '')
if (retCode == "TAOS_FAIL") and ("Unable to establish connection" in retVal): if (retCode == "TAOS_FAIL") and ("Unable to establish connection" in retVal):
tdLog.info("taos -P %s test suceess"%keyDict['P']) tdLog.info("taos -P %s test success"%keyDict['P'])
else: else:
tdLog.exit("taos -P %s fail"%keyDict['P']) tdLog.exit("taos -P %s fail"%keyDict['P'])
keyDict['P'] = '12ab' keyDict['P'] = '12ab'
retCode, retVal = taos_command(buildPath, "P", keyDict['P'], "taos>", keyDict['c'], '') retCode, retVal = taos_command(buildPath, "P", keyDict['P'], "taos>", keyDict['c'], '')
if (retCode == "TAOS_FAIL") and ("Unable to establish connection" in retVal): if (retCode == "TAOS_FAIL") and ("Unable to establish connection" in retVal):
tdLog.info("taos -P %s test suceess"%keyDict['P']) tdLog.info("taos -P %s test success"%keyDict['P'])
else: else:
tdLog.exit("taos -P %s fail"%keyDict['P']) tdLog.exit("taos -P %s fail"%keyDict['P'])
keyDict['P'] = '\'12ab\'' keyDict['P'] = '\'12ab\''
retCode, retVal = taos_command(buildPath, "P", keyDict['P'], "taos>", keyDict['c'], '') retCode, retVal = taos_command(buildPath, "P", keyDict['P'], "taos>", keyDict['c'], '')
if (retCode == "TAOS_FAIL") and ("Unable to establish connection" in retVal): if (retCode == "TAOS_FAIL") and ("Unable to establish connection" in retVal):
tdLog.info("taos -P %s test suceess"%keyDict['P']) tdLog.info("taos -P %s test success"%keyDict['P'])
else: else:
tdLog.exit("taos -P %s fail"%keyDict['P']) tdLog.exit("taos -P %s fail"%keyDict['P'])
@ -293,7 +293,7 @@ class TDTestCase:
keyDict['p'] = 'errorPassword' keyDict['p'] = 'errorPassword'
retCode, retVal = taos_command(buildPath, "u", keyDict['u'], "taos>", keyDict['c'], sqlString, 'p', keyDict['p']) retCode, retVal = taos_command(buildPath, "u", keyDict['u'], "taos>", keyDict['c'], sqlString, 'p', keyDict['p'])
if retCode == "TAOS_FAIL" and "Authentication failure" in retVal: if retCode == "TAOS_FAIL" and "Authentication failure" in retVal:
tdLog.info("taos -p %s test suceess"%keyDict['p']) tdLog.info("taos -p %s test success"%keyDict['p'])
else: else:
tdLog.exit("taos -u %s -p %s"%(keyDict['u'], keyDict['p'])) tdLog.exit("taos -u %s -p %s"%(keyDict['u'], keyDict['p']))

View File

@ -1 +1,2 @@
#python3 ./test.py -f 2-query/last.py -Q 3 #python3 ./test.py -f 2-query/last.py -Q 3
#./test.sh -f tsim/mnode/basic4.sim