Merge branch '3.0' of github.com:taosdata/TDengine into szhou/feature/push-project-condition

This commit is contained in:
slzhou 2022-07-07 15:31:46 +08:00
commit c8104d5141
41 changed files with 520 additions and 374 deletions

View File

@ -241,7 +241,7 @@ int32_t create_topic() {
taos_free_result(pRes); taos_free_result(pRes);
pRes = taos_query(pConn, "create topic topic_ctb_column with meta as database abc1"); pRes = taos_query(pConn, "create topic topic_ctb_column with meta as database abc1");
// pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1, c2, c3 from st1"); /*pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1, c2, c3 from st1");*/
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
printf("failed to create topic topic_ctb_column, reason:%s\n", taos_errstr(pRes)); printf("failed to create topic topic_ctb_column, reason:%s\n", taos_errstr(pRes));
return -1; return -1;
@ -302,7 +302,7 @@ tmq_t* build_consumer() {
tmq_conf_set(conf, "msg.with.table.name", "true"); tmq_conf_set(conf, "msg.with.table.name", "true");
tmq_conf_set(conf, "enable.auto.commit", "true"); tmq_conf_set(conf, "enable.auto.commit", "true");
tmq_conf_set(conf, "experimental.snapshot.enable", "false"); /*tmq_conf_set(conf, "experimental.snapshot.enable", "true");*/
tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL); tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL);
tmq_t* tmq = tmq_consumer_new(conf, NULL, 0); tmq_t* tmq = tmq_consumer_new(conf, NULL, 0);

View File

@ -32,6 +32,18 @@ enum {
TMQ_CONF__RESET_OFFSET__LATEST = -1, TMQ_CONF__RESET_OFFSET__LATEST = -1,
}; };
// clang-format off
#define IS_META_MSG(x) ( \
x == TDMT_VND_CREATE_STB \
|| x == TDMT_VND_ALTER_STB \
|| x == TDMT_VND_DROP_STB \
|| x == TDMT_VND_CREATE_TABLE \
|| x == TDMT_VND_ALTER_TABLE \
|| x == TDMT_VND_DROP_TABLE \
|| x == TDMT_VND_DROP_TTL_TABLE \
)
// clang-format on
enum { enum {
TMQ_MSG_TYPE__DUMMY = 0, TMQ_MSG_TYPE__DUMMY = 0,
TMQ_MSG_TYPE__POLL_RSP, TMQ_MSG_TYPE__POLL_RSP,

View File

@ -2826,8 +2826,8 @@ typedef struct {
static FORCE_INLINE int32_t tEncodeSMqMetaRsp(void** buf, const SMqMetaRsp* pRsp) { static FORCE_INLINE int32_t tEncodeSMqMetaRsp(void** buf, const SMqMetaRsp* pRsp) {
int32_t tlen = 0; int32_t tlen = 0;
tlen += taosEncodeFixedI64(buf, pRsp->reqOffset); // tlen += taosEncodeFixedI64(buf, pRsp->reqOffset);
tlen += taosEncodeFixedI64(buf, pRsp->rspOffset); // tlen += taosEncodeFixedI64(buf, pRsp->rspOffset);
tlen += taosEncodeFixedI16(buf, pRsp->resMsgType); tlen += taosEncodeFixedI16(buf, pRsp->resMsgType);
tlen += taosEncodeFixedI32(buf, pRsp->metaRspLen); tlen += taosEncodeFixedI32(buf, pRsp->metaRspLen);
tlen += taosEncodeBinary(buf, pRsp->metaRsp, pRsp->metaRspLen); tlen += taosEncodeBinary(buf, pRsp->metaRsp, pRsp->metaRspLen);
@ -2835,8 +2835,8 @@ static FORCE_INLINE int32_t tEncodeSMqMetaRsp(void** buf, const SMqMetaRsp* pRsp
} }
static FORCE_INLINE void* tDecodeSMqMetaRsp(const void* buf, SMqMetaRsp* pRsp) { static FORCE_INLINE void* tDecodeSMqMetaRsp(const void* buf, SMqMetaRsp* pRsp) {
buf = taosDecodeFixedI64(buf, &pRsp->reqOffset); // buf = taosDecodeFixedI64(buf, &pRsp->reqOffset);
buf = taosDecodeFixedI64(buf, &pRsp->rspOffset); // buf = taosDecodeFixedI64(buf, &pRsp->rspOffset);
buf = taosDecodeFixedI16(buf, &pRsp->resMsgType); buf = taosDecodeFixedI16(buf, &pRsp->resMsgType);
buf = taosDecodeFixedI32(buf, &pRsp->metaRspLen); buf = taosDecodeFixedI32(buf, &pRsp->metaRspLen);
buf = taosDecodeBinary(buf, &pRsp->metaRsp, pRsp->metaRspLen); buf = taosDecodeBinary(buf, &pRsp->metaRsp, pRsp->metaRspLen);

View File

@ -30,15 +30,15 @@ struct SRpcMsg;
struct SSubplan; struct SSubplan;
typedef struct SReadHandle { typedef struct SReadHandle {
void* reader; void* streamReader;
void* meta; void* meta;
void* config; void* config;
void* vnode; void* vnode;
void* mnd; void* mnd;
SMsgCb* pMsgCb; SMsgCb* pMsgCb;
bool initMetaReader;
// int8_t initTsdbReader; bool initTableReader;
bool tqReader; bool initStreamReader;
} SReadHandle; } SReadHandle;
typedef enum { typedef enum {

View File

@ -223,7 +223,7 @@ typedef struct {
SEpSet epSet; SEpSet epSet;
} SStreamChildEpInfo; } SStreamChildEpInfo;
struct SStreamTask { typedef struct SStreamTask {
int64_t streamId; int64_t streamId;
int32_t taskId; int32_t taskId;
int8_t isDataScan; int8_t isDataScan;
@ -235,6 +235,11 @@ struct SStreamTask {
int8_t taskStatus; int8_t taskStatus;
int8_t execStatus; int8_t execStatus;
// exec info
int64_t enqueueVer;
int64_t processedVer;
int64_t checkpointVer;
// node info // node info
int32_t selfChildId; int32_t selfChildId;
int32_t nodeId; int32_t nodeId;
@ -277,7 +282,7 @@ struct SStreamTask {
// msg handle // msg handle
SMsgCb* pMsgCb; SMsgCb* pMsgCb;
}; } SStreamTask;
int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo); int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo);
int32_t tDecodeStreamEpInfo(SDecoder* pDecoder, SStreamChildEpInfo* pInfo); int32_t tDecodeStreamEpInfo(SDecoder* pDecoder, SStreamChildEpInfo* pInfo);
@ -288,6 +293,7 @@ int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask);
void tFreeSStreamTask(SStreamTask* pTask); void tFreeSStreamTask(SStreamTask* pTask);
static FORCE_INLINE int32_t streamTaskInput(SStreamTask* pTask, SStreamQueueItem* pItem) { static FORCE_INLINE int32_t streamTaskInput(SStreamTask* pTask, SStreamQueueItem* pItem) {
#if 0
while (1) { while (1) {
int8_t inputStatus = int8_t inputStatus =
atomic_val_compare_exchange_8(&pTask->inputStatus, TASK_INPUT_STATUS__NORMAL, TASK_INPUT_STATUS__PROCESSING); atomic_val_compare_exchange_8(&pTask->inputStatus, TASK_INPUT_STATUS__NORMAL, TASK_INPUT_STATUS__PROCESSING);
@ -296,6 +302,7 @@ static FORCE_INLINE int32_t streamTaskInput(SStreamTask* pTask, SStreamQueueItem
} }
ASSERT(0); ASSERT(0);
} }
#endif
if (pItem->type == STREAM_INPUT__DATA_SUBMIT) { if (pItem->type == STREAM_INPUT__DATA_SUBMIT) {
SStreamDataSubmit* pSubmitClone = streamSubmitRefClone((SStreamDataSubmit*)pItem); SStreamDataSubmit* pSubmitClone = streamSubmitRefClone((SStreamDataSubmit*)pItem);
@ -316,8 +323,10 @@ static FORCE_INLINE int32_t streamTaskInput(SStreamTask* pTask, SStreamQueueItem
atomic_val_compare_exchange_8(&pTask->triggerStatus, TASK_TRIGGER_STATUS__IN_ACTIVE, TASK_TRIGGER_STATUS__ACTIVE); atomic_val_compare_exchange_8(&pTask->triggerStatus, TASK_TRIGGER_STATUS__IN_ACTIVE, TASK_TRIGGER_STATUS__ACTIVE);
} }
#if 0
// TODO: back pressure // TODO: back pressure
atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__NORMAL); atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__NORMAL);
#endif
return 0; return 0;
} }

View File

@ -88,7 +88,7 @@ typedef struct {
EWalType level; // wal level EWalType level; // wal level
} SWalCfg; } SWalCfg;
typedef struct SWalVer { typedef struct {
int64_t firstVer; int64_t firstVer;
int64_t verInSnapshotting; int64_t verInSnapshotting;
int64_t snapshotVer; int64_t snapshotVer;
@ -149,17 +149,22 @@ typedef struct SWal {
SWalCkHead writeHead; SWalCkHead writeHead;
} SWal; // WAL HANDLE } SWal; // WAL HANDLE
typedef struct SWalReadHandle { typedef struct {
SWal *pWal; int8_t scanUncommited;
TdFilePtr pReadLogTFile; int8_t scanMeta;
TdFilePtr pReadIdxTFile; } SWalFilterCond;
int64_t curFileFirstVer;
int64_t curVersion; typedef struct {
int64_t capacity; SWal *pWal;
int64_t status; // if cursor valid TdFilePtr pLogFile;
TdThreadMutex mutex; TdFilePtr pIdxFile;
SWalCkHead *pHead; int64_t curFileFirstVer;
} SWalReadHandle; int64_t curVersion;
int64_t capacity;
TdThreadMutex mutex;
SWalFilterCond cond;
SWalCkHead *pHead;
} SWalReader;
// module initialization // module initialization
int32_t walInit(); int32_t walInit();
@ -178,7 +183,6 @@ void walFsync(SWal *, bool force);
// apis for lifecycle management // apis for lifecycle management
int32_t walCommit(SWal *, int64_t ver); int32_t walCommit(SWal *, int64_t ver);
// truncate after
int32_t walRollback(SWal *, int64_t ver); int32_t walRollback(SWal *, int64_t ver);
// notify that previous logs can be pruned safely // notify that previous logs can be pruned safely
int32_t walBeginSnapshot(SWal *, int64_t ver); int32_t walBeginSnapshot(SWal *, int64_t ver);
@ -187,15 +191,16 @@ int32_t walRestoreFromSnapshot(SWal *, int64_t ver);
// int32_t walDataCorrupted(SWal*); // int32_t walDataCorrupted(SWal*);
// read // read
SWalReadHandle *walOpenReadHandle(SWal *); SWalReader *walOpenReader(SWal *, SWalFilterCond *pCond);
void walCloseReadHandle(SWalReadHandle *); void walCloseReader(SWalReader *pRead);
int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver); int32_t walReadVer(SWalReader *pRead, int64_t ver);
int32_t walNextValidMsg(SWalReader *pRead);
// only for tq usage // only for tq usage
void walSetReaderCapacity(SWalReadHandle *pRead, int32_t capacity); void walSetReaderCapacity(SWalReader *pRead, int32_t capacity);
int32_t walFetchHead(SWalReadHandle *pRead, int64_t ver, SWalCkHead *pHead); int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead);
int32_t walFetchBody(SWalReadHandle *pRead, SWalCkHead **ppHead); int32_t walFetchBody(SWalReader *pRead, SWalCkHead **ppHead);
int32_t walSkipFetchBody(SWalReadHandle *pRead, const SWalCkHead *pHead); int32_t walSkipFetchBody(SWalReader *pRead, const SWalCkHead *pHead);
typedef struct { typedef struct {
int64_t refId; int64_t refId;
@ -207,10 +212,11 @@ void walCloseRef(SWalRef *);
int32_t walRefVer(SWalRef *, int64_t ver); int32_t walRefVer(SWalRef *, int64_t ver);
int32_t walUnrefVer(SWal *); int32_t walUnrefVer(SWal *);
// help function for raft
bool walLogExist(SWal *, int64_t ver); bool walLogExist(SWal *, int64_t ver);
bool walIsEmpty(SWal *);
// lifecycle check // lifecycle check
bool walIsEmpty(SWal *);
int64_t walGetFirstVer(SWal *); int64_t walGetFirstVer(SWal *);
int64_t walGetSnapshotVer(SWal *); int64_t walGetSnapshotVer(SWal *);
int64_t walGetLastVer(SWal *); int64_t walGetLastVer(SWal *);

View File

@ -45,6 +45,7 @@ void taosIp2String(uint32_t ip, char *str);
void taosIpPort2String(uint32_t ip, uint16_t port, char *str); void taosIpPort2String(uint32_t ip, uint16_t port, char *str);
void *tmemmem(const char *haystack, int hlen, const char *needle, int nlen); void *tmemmem(const char *haystack, int hlen, const char *needle, int nlen);
char *strDupUnquo(const char *src);
static FORCE_INLINE void taosEncryptPass(uint8_t *inBuf, size_t inLen, char *target) { static FORCE_INLINE void taosEncryptPass(uint8_t *inBuf, size_t inLen, char *target) {
T_MD5_CTX context; T_MD5_CTX context;

View File

@ -50,19 +50,18 @@ struct tmq_list_t {
}; };
struct tmq_conf_t { struct tmq_conf_t {
char clientId[256]; char clientId[256];
char groupId[TSDB_CGROUP_LEN]; char groupId[TSDB_CGROUP_LEN];
int8_t autoCommit; int8_t autoCommit;
int8_t resetOffset; int8_t resetOffset;
int8_t withTbName; int8_t withTbName;
int8_t spEnable; int8_t spEnable;
int32_t spBatchSize; int32_t spBatchSize;
uint16_t port; uint16_t port;
int32_t autoCommitInterval; int32_t autoCommitInterval;
char* ip; char* ip;
char* user; char* user;
char* pass; char* pass;
/*char* db;*/
tmq_commit_cb* commitCb; tmq_commit_cb* commitCb;
void* commitCbUserParam; void* commitCbUserParam;
}; };
@ -338,7 +337,7 @@ tmq_list_t* tmq_list_new() {
int32_t tmq_list_append(tmq_list_t* list, const char* src) { int32_t tmq_list_append(tmq_list_t* list, const char* src) {
SArray* container = &list->container; SArray* container = &list->container;
char* topic = strdup(src); char* topic = strDupUnquo(src);
if (taosArrayPush(container, &topic) == NULL) return -1; if (taosArrayPush(container, &topic) == NULL) return -1;
return 0; return 0;
} }

View File

@ -1737,41 +1737,54 @@ char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf)
int32_t len = 0; int32_t len = 0;
len += snprintf(dumpBuf + len, size - len, "\n%s |block type %d |child id %d|group id %lu|\n", flag, len += snprintf(dumpBuf + len, size - len, "\n%s |block type %d |child id %d|group id %lu|\n", flag,
(int32_t)pDataBlock->info.type, pDataBlock->info.childId, pDataBlock->info.groupId); (int32_t)pDataBlock->info.type, pDataBlock->info.childId, pDataBlock->info.groupId);
if (len >= size -1) return dumpBuf;
for (int32_t j = 0; j < rows; j++) { for (int32_t j = 0; j < rows; j++) {
len += snprintf(dumpBuf + len, size - len, "%s |", flag); len += snprintf(dumpBuf + len, size - len, "%s |", flag);
if (len >= size -1) return dumpBuf;
for (int32_t k = 0; k < colNum; k++) { for (int32_t k = 0; k < colNum; k++) {
SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, k); SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, k);
void* var = POINTER_SHIFT(pColInfoData->pData, j * pColInfoData->info.bytes); void* var = POINTER_SHIFT(pColInfoData->pData, j * pColInfoData->info.bytes);
if (colDataIsNull(pColInfoData, rows, j, NULL)) { if (colDataIsNull(pColInfoData, rows, j, NULL)) {
len += snprintf(dumpBuf + len, size - len, " %15s |", "NULL"); len += snprintf(dumpBuf + len, size - len, " %15s |", "NULL");
if (len >= size -1) return dumpBuf;
continue; continue;
} }
switch (pColInfoData->info.type) { switch (pColInfoData->info.type) {
case TSDB_DATA_TYPE_TIMESTAMP: case TSDB_DATA_TYPE_TIMESTAMP:
formatTimestamp(pBuf, *(uint64_t*)var, TSDB_TIME_PRECISION_MILLI); formatTimestamp(pBuf, *(uint64_t*)var, TSDB_TIME_PRECISION_MILLI);
len += snprintf(dumpBuf + len, size - len, " %25s |", pBuf); len += snprintf(dumpBuf + len, size - len, " %25s |", pBuf);
if (len >= size -1) return dumpBuf;
break; break;
case TSDB_DATA_TYPE_INT: case TSDB_DATA_TYPE_INT:
len += snprintf(dumpBuf + len, size - len, " %15d |", *(int32_t*)var); len += snprintf(dumpBuf + len, size - len, " %15d |", *(int32_t*)var);
if (len >= size -1) return dumpBuf;
break; break;
case TSDB_DATA_TYPE_UINT: case TSDB_DATA_TYPE_UINT:
len += snprintf(dumpBuf + len, size - len, " %15u |", *(uint32_t*)var); len += snprintf(dumpBuf + len, size - len, " %15u |", *(uint32_t*)var);
if (len >= size -1) return dumpBuf;
break; break;
case TSDB_DATA_TYPE_BIGINT: case TSDB_DATA_TYPE_BIGINT:
len += snprintf(dumpBuf + len, size - len, " %15ld |", *(int64_t*)var); len += snprintf(dumpBuf + len, size - len, " %15ld |", *(int64_t*)var);
if (len >= size -1) return dumpBuf;
break; break;
case TSDB_DATA_TYPE_UBIGINT: case TSDB_DATA_TYPE_UBIGINT:
len += snprintf(dumpBuf + len, size - len, " %15lu |", *(uint64_t*)var); len += snprintf(dumpBuf + len, size - len, " %15lu |", *(uint64_t*)var);
if (len >= size -1) return dumpBuf;
break; break;
case TSDB_DATA_TYPE_FLOAT: case TSDB_DATA_TYPE_FLOAT:
len += snprintf(dumpBuf + len, size - len, " %15f |", *(float*)var); len += snprintf(dumpBuf + len, size - len, " %15f |", *(float*)var);
if (len >= size -1) return dumpBuf;
break; break;
case TSDB_DATA_TYPE_DOUBLE: case TSDB_DATA_TYPE_DOUBLE:
len += snprintf(dumpBuf + len, size - len, " %15lf |", *(double*)var); len += snprintf(dumpBuf + len, size - len, " %15lf |", *(double*)var);
if (len >= size -1) return dumpBuf;
break; break;
} }
} }
len += snprintf(dumpBuf + len, size - len, "\n"); len += snprintf(dumpBuf + len, size - len, "\n");
if (len >= size -1) return dumpBuf;
} }
len += snprintf(dumpBuf + len, size - len, "%s |end\n", flag); len += snprintf(dumpBuf + len, size - len, "%s |end\n", flag);
return dumpBuf; return dumpBuf;

View File

@ -40,15 +40,6 @@ extern "C" {
#define tqDebug(...) do { if (tqDebugFlag & DEBUG_DEBUG) { taosPrintLog("TQ ", DEBUG_DEBUG, tqDebugFlag, __VA_ARGS__); }} while(0) #define tqDebug(...) do { if (tqDebugFlag & DEBUG_DEBUG) { taosPrintLog("TQ ", DEBUG_DEBUG, tqDebugFlag, __VA_ARGS__); }} while(0)
#define tqTrace(...) do { if (tqDebugFlag & DEBUG_TRACE) { taosPrintLog("TQ ", DEBUG_TRACE, tqDebugFlag, __VA_ARGS__); }} while(0) #define tqTrace(...) do { if (tqDebugFlag & DEBUG_TRACE) { taosPrintLog("TQ ", DEBUG_TRACE, tqDebugFlag, __VA_ARGS__); }} while(0)
#define IS_META_MSG(x) ( \
x == TDMT_VND_CREATE_STB \
|| x == TDMT_VND_ALTER_STB \
|| x == TDMT_VND_DROP_STB \
|| x == TDMT_VND_CREATE_TABLE \
|| x == TDMT_VND_ALTER_TABLE \
|| x == TDMT_VND_DROP_TABLE \
|| x == TDMT_VND_DROP_TTL_TABLE \
)
// clang-format on // clang-format on
typedef struct STqOffsetStore STqOffsetStore; typedef struct STqOffsetStore STqOffsetStore;
@ -128,7 +119,7 @@ typedef struct {
int8_t fetchMeta; int8_t fetchMeta;
// reader // reader
SWalReadHandle* pWalReader; SWalReader* pWalReader;
// push // push
STqPushHandle pushHandle; STqPushHandle pushHandle;

View File

@ -332,7 +332,7 @@ int32_t tdProcessRSmaCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, con
} }
SReadHandle handle = { SReadHandle handle = {
.reader = pReadHandle, .streamReader = pReadHandle,
.meta = pMeta, .meta = pMeta,
.pMsgCb = pMsgCb, .pMsgCb = pMsgCb,
.vnode = pVnode, .vnode = pVnode,

View File

@ -28,8 +28,12 @@ int32_t tqInit() {
atomic_store_8(&tqMgmt.inited, 0); atomic_store_8(&tqMgmt.inited, 0);
return -1; return -1;
} }
if (streamInit() < 0) {
return -1;
}
atomic_store_8(&tqMgmt.inited, 1); atomic_store_8(&tqMgmt.inited, 1);
} }
return 0; return 0;
} }
@ -42,6 +46,7 @@ void tqCleanUp() {
if (old == 1) { if (old == 1) {
taosTmrCleanUp(tqMgmt.timer); taosTmrCleanUp(tqMgmt.timer);
streamCleanUp();
atomic_store_8(&tqMgmt.inited, 0); atomic_store_8(&tqMgmt.inited, 0);
} }
} }
@ -144,7 +149,6 @@ int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, con
SEncoder encoder; SEncoder encoder;
tEncoderInit(&encoder, abuf, len); tEncoderInit(&encoder, abuf, len);
tEncodeSMqDataRsp(&encoder, pRsp); tEncodeSMqDataRsp(&encoder, pRsp);
/*tEncodeSMqDataBlkRsp(&abuf, pRsp);*/
SRpcMsg rsp = { SRpcMsg rsp = {
.info = pMsg->info, .info = pMsg->info,
@ -361,8 +365,11 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
ASSERT(IS_META_MSG(pHead->msgType)); ASSERT(IS_META_MSG(pHead->msgType));
tqInfo("fetch meta msg, ver: %ld, type: %d", pHead->version, pHead->msgType); tqInfo("fetch meta msg, ver: %ld, type: %d", pHead->version, pHead->msgType);
SMqMetaRsp metaRsp = {0}; SMqMetaRsp metaRsp = {0};
metaRsp.reqOffset = pReq->reqOffset.version; /*metaRsp.reqOffset = pReq->reqOffset.version;*/
metaRsp.rspOffset = fetchVer; /*metaRsp.rspOffset = fetchVer;*/
/*metaRsp.rspOffsetNew.version = fetchVer;*/
tqOffsetResetToLog(&metaRsp.reqOffsetNew, pReq->reqOffset.version);
tqOffsetResetToLog(&metaRsp.rspOffsetNew, fetchVer);
metaRsp.resMsgType = pHead->msgType; metaRsp.resMsgType = pHead->msgType;
metaRsp.metaRspLen = pHead->bodyLen; metaRsp.metaRspLen = pHead->bodyLen;
metaRsp.metaRsp = pHead->body; metaRsp.metaRsp = pHead->body;
@ -439,7 +446,7 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
pHandle->execHandle.subType = req.subType; pHandle->execHandle.subType = req.subType;
pHandle->fetchMeta = req.withMeta; pHandle->fetchMeta = req.withMeta;
pHandle->pWalReader = walOpenReadHandle(pTq->pVnode->pWal); pHandle->pWalReader = walOpenReader(pTq->pVnode->pWal, NULL);
for (int32_t i = 0; i < 5; i++) { for (int32_t i = 0; i < 5; i++) {
pHandle->execHandle.pExecReader[i] = tqInitSubmitMsgScanner(pTq->pVnode->pMeta); pHandle->execHandle.pExecReader[i] = tqInitSubmitMsgScanner(pTq->pVnode->pMeta);
} }
@ -448,10 +455,10 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
req.qmsg = NULL; req.qmsg = NULL;
for (int32_t i = 0; i < 5; i++) { for (int32_t i = 0; i < 5; i++) {
SReadHandle handle = { SReadHandle handle = {
.reader = pHandle->execHandle.pExecReader[i], .streamReader = pHandle->execHandle.pExecReader[i],
.meta = pTq->pVnode->pMeta, .meta = pTq->pVnode->pMeta,
.vnode = pTq->pVnode, .vnode = pTq->pVnode,
.tqReader = true, .initTableReader = true,
}; };
pHandle->execHandle.execCol.task[i] = qCreateStreamExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle); pHandle->execHandle.execCol.task[i] = qCreateStreamExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle);
ASSERT(pHandle->execHandle.execCol.task[i]); ASSERT(pHandle->execHandle.execCol.task[i]);
@ -522,19 +529,16 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, char* msg, int32_t msgLen) {
if (pTask->execType != TASK_EXEC__NONE) { if (pTask->execType != TASK_EXEC__NONE) {
// expand runners // expand runners
if (pTask->isDataScan) { if (pTask->isDataScan) {
SStreamReader* pStreamReader = tqInitSubmitMsgScanner(pTq->pVnode->pMeta); SReadHandle handle = {
SReadHandle handle = { .meta = pTq->pVnode->pMeta,
.reader = pStreamReader, .vnode = pTq->pVnode,
.meta = pTq->pVnode->pMeta, .initStreamReader = 1,
.vnode = pTq->pVnode,
}; };
/*pTask->exec.inputHandle = pStreamReader;*/
pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle); pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle);
ASSERT(pTask->exec.executor);
} else { } else {
pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, NULL); pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, NULL);
ASSERT(pTask->exec.executor);
} }
ASSERT(pTask->exec.executor);
} }
// sink // sink

View File

@ -77,14 +77,14 @@ int32_t tqMetaOpen(STQ* pTq) {
STqHandle handle; STqHandle handle;
tDecoderInit(&decoder, (uint8_t*)pVal, vLen); tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
tDecodeSTqHandle(&decoder, &handle); tDecodeSTqHandle(&decoder, &handle);
handle.pWalReader = walOpenReadHandle(pTq->pVnode->pWal); handle.pWalReader = walOpenReader(pTq->pVnode->pWal, NULL);
for (int32_t i = 0; i < 5; i++) { for (int32_t i = 0; i < 5; i++) {
handle.execHandle.pExecReader[i] = tqInitSubmitMsgScanner(pTq->pVnode->pMeta); handle.execHandle.pExecReader[i] = tqInitSubmitMsgScanner(pTq->pVnode->pMeta);
} }
if (handle.execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { if (handle.execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
for (int32_t i = 0; i < 5; i++) { for (int32_t i = 0; i < 5; i++) {
SReadHandle reader = { SReadHandle reader = {
.reader = handle.execHandle.pExecReader[i], .streamReader = handle.execHandle.pExecReader[i],
.meta = pTq->pVnode->pMeta, .meta = pTq->pVnode->pMeta,
.pMsgCb = &pTq->pVnode->msgCb, .pMsgCb = &pTq->pVnode->msgCb,
.vnode = pTq->pVnode, .vnode = pTq->pVnode,

View File

@ -168,8 +168,6 @@ static int32_t tsdbCommitTableDel(SCommitter *pCommitter, STbData *pTbData, SDel
tb_uid_t suid; tb_uid_t suid;
tb_uid_t uid; tb_uid_t uid;
taosArrayClear(pCommitter->aDelData);
if (pTbData) { if (pTbData) {
suid = pTbData->suid; suid = pTbData->suid;
uid = pTbData->uid; uid = pTbData->uid;
@ -185,6 +183,8 @@ static int32_t tsdbCommitTableDel(SCommitter *pCommitter, STbData *pTbData, SDel
code = tsdbReadDelData(pCommitter->pDelFReader, pDelIdx, pCommitter->aDelData, NULL); code = tsdbReadDelData(pCommitter->pDelFReader, pDelIdx, pCommitter->aDelData, NULL);
if (code) goto _err; if (code) goto _err;
} else {
taosArrayClear(pCommitter->aDelData);
} }
if (pTbData == NULL && pDelIdx == NULL) goto _exit; if (pTbData == NULL && pDelIdx == NULL) goto _exit;
@ -205,7 +205,7 @@ static int32_t tsdbCommitTableDel(SCommitter *pCommitter, STbData *pTbData, SDel
if (code) goto _err; if (code) goto _err;
// put delIdx // put delIdx
if (taosArrayPush(pCommitter->aDelIdx, &delIdx) == NULL) { if (taosArrayPush(pCommitter->aDelIdxN, &delIdx) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _err; goto _err;
} }
@ -854,7 +854,7 @@ static int32_t tsdbCommitFileDataEnd(SCommitter *pCommitter) {
if (pCommitter->pReader) { if (pCommitter->pReader) {
code = tsdbDataFReaderClose(&pCommitter->pReader); code = tsdbDataFReaderClose(&pCommitter->pReader);
goto _err; if (code) goto _err;
} }
_exit: _exit:

View File

@ -520,7 +520,7 @@ static int32_t tsdbScanAndTryFixFS(STsdbFS *pFS, int8_t deepScan) {
return code; return code;
_err: _err:
tsdbError("vgId:%d tsdb can and try fix fs failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); tsdbError("vgId:%d tsdb scan and try fix fs failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
return code; return code;
} }

View File

@ -49,7 +49,7 @@ int32_t tsdbDelFWriterOpen(SDelFWriter **ppWriter, SDelFile *pFile, STsdb *pTsdb
} }
pDelFWriter->fDel.size = TSDB_FHDR_SIZE; pDelFWriter->fDel.size = TSDB_FHDR_SIZE;
pDelFWriter->fDel.size = 0; pDelFWriter->fDel.offset = 0;
*ppWriter = pDelFWriter; *ppWriter = pDelFWriter;
return code; return code;

View File

@ -349,7 +349,7 @@ static int32_t vnodeProcessDropTtlTbReq(SVnode *pVnode, int64_t version, void *p
if (tbUids == NULL) return TSDB_CODE_OUT_OF_MEMORY; if (tbUids == NULL) return TSDB_CODE_OUT_OF_MEMORY;
int32_t t = ntohl(*(int32_t *)pReq); int32_t t = ntohl(*(int32_t *)pReq);
vError("rec ttl time:%d", t); vDebug("rec ttl time:%d", t);
int32_t ret = metaTtlDropTable(pVnode->pMeta, t, tbUids); int32_t ret = metaTtlDropTable(pVnode->pMeta, t, tbUids);
if (ret != 0) { if (ret != 0) {
goto end; goto end;

View File

@ -348,7 +348,7 @@ typedef struct SessionWindowSupporter {
uint8_t parentType; uint8_t parentType;
} SessionWindowSupporter; } SessionWindowSupporter;
typedef struct SStreamBlockScanInfo { typedef struct SStreamScanInfo {
uint64_t tableUid; // queried super table uid uint64_t tableUid; // queried super table uid
SExprInfo* pPseudoExpr; SExprInfo* pPseudoExpr;
int32_t numOfPseudoExpr; int32_t numOfPseudoExpr;
@ -365,7 +365,7 @@ typedef struct SStreamBlockScanInfo {
int32_t blockType; // current block type int32_t blockType; // current block type
int32_t validBlockIndex; // Is current data has returned? int32_t validBlockIndex; // Is current data has returned?
uint64_t numOfExec; // execution times uint64_t numOfExec; // execution times
void* streamBlockReader;// stream block reader handle void* streamReader;// stream block reader handle
int32_t tsArrayIndex; int32_t tsArrayIndex;
SArray* tsArray; SArray* tsArray;
@ -374,7 +374,7 @@ typedef struct SStreamBlockScanInfo {
EStreamScanMode scanMode; EStreamScanMode scanMode;
SOperatorInfo* pStreamScanOp; SOperatorInfo* pStreamScanOp;
SOperatorInfo* pSnapshotReadOp; SOperatorInfo* pTableScanOp;
SArray* childIds; SArray* childIds;
SessionWindowSupporter sessionSup; SessionWindowSupporter sessionSup;
bool assignBlockUid; // assign block uid to groupId, temporarily used for generating rollup SMA. bool assignBlockUid; // assign block uid to groupId, temporarily used for generating rollup SMA.
@ -383,7 +383,7 @@ typedef struct SStreamBlockScanInfo {
SSDataBlock* pPullDataRes; // pull data SSDataBlock SSDataBlock* pPullDataRes; // pull data SSDataBlock
SSDataBlock* pDeleteDataRes; // delete data SSDataBlock SSDataBlock* pDeleteDataRes; // delete data SSDataBlock
int32_t deleteDataIndex; int32_t deleteDataIndex;
} SStreamBlockScanInfo; } SStreamScanInfo;
typedef struct SSysTableScanInfo { typedef struct SSysTableScanInfo {
SRetrieveMetaTableRsp* pRsp; SRetrieveMetaTableRsp* pRsp;

View File

@ -37,7 +37,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
} else { } else {
pOperator->status = OP_NOT_OPENED; pOperator->status = OP_NOT_OPENED;
SStreamBlockScanInfo* pInfo = pOperator->info; SStreamScanInfo* pInfo = pOperator->info;
pInfo->assignBlockUid = assignUid; pInfo->assignBlockUid = assignUid;
// TODO: if a block was set but not consumed, // TODO: if a block was set but not consumed,
@ -45,7 +45,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
pInfo->blockType = type; pInfo->blockType = type;
if (type == STREAM_INPUT__DATA_SUBMIT) { if (type == STREAM_INPUT__DATA_SUBMIT) {
if (tqReadHandleSetMsg(pInfo->streamBlockReader, input, 0) < 0) { if (tqReadHandleSetMsg(pInfo->streamReader, input, 0) < 0) {
qError("submit msg messed up when initing stream block, %s" PRIx64, id); qError("submit msg messed up when initing stream block, %s" PRIx64, id);
return TSDB_CODE_QRY_APP_ERROR; return TSDB_CODE_QRY_APP_ERROR;
} }
@ -130,7 +130,7 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, void* streamReadHandle) {
return pTaskInfo; return pTaskInfo;
} }
static SArray* filterQualifiedChildTables(const SStreamBlockScanInfo* pScanInfo, const SArray* tableIdList) { static SArray* filterQualifiedChildTables(const SStreamScanInfo* pScanInfo, const SArray* tableIdList) {
SArray* qa = taosArrayInit(4, sizeof(tb_uid_t)); SArray* qa = taosArrayInit(4, sizeof(tb_uid_t));
// let's discard the tables those are not created according to the queried super table. // let's discard the tables those are not created according to the queried super table.
@ -168,17 +168,17 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo
pInfo = pInfo->pDownstream[0]; pInfo = pInfo->pDownstream[0];
} }
int32_t code = 0; int32_t code = 0;
SStreamBlockScanInfo* pScanInfo = pInfo->info; SStreamScanInfo* pScanInfo = pInfo->info;
if (isAdd) { // add new table id if (isAdd) { // add new table id
SArray* qa = filterQualifiedChildTables(pScanInfo, tableIdList); SArray* qa = filterQualifiedChildTables(pScanInfo, tableIdList);
qDebug(" %d qualified child tables added into stream scanner", (int32_t)taosArrayGetSize(qa)); qDebug(" %d qualified child tables added into stream scanner", (int32_t)taosArrayGetSize(qa));
code = tqReadHandleAddTbUidList(pScanInfo->streamBlockReader, qa); code = tqReadHandleAddTbUidList(pScanInfo->streamReader, qa);
taosArrayDestroy(qa); taosArrayDestroy(qa);
} else { // remove the table id in current list } else { // remove the table id in current list
qDebug(" %d remove child tables from the stream scanner", (int32_t)taosArrayGetSize(tableIdList)); qDebug(" %d remove child tables from the stream scanner", (int32_t)taosArrayGetSize(tableIdList));
code = tqReadHandleRemoveTbUidList(pScanInfo->streamBlockReader, tableIdList); code = tqReadHandleRemoveTbUidList(pScanInfo->streamReader, tableIdList);
} }
return code; return code;

View File

@ -2849,12 +2849,12 @@ int32_t doPrepareScan(SOperatorInfo* pOperator, uint64_t uid, int64_t ts) {
pOperator->status = OP_OPENED; pOperator->status = OP_OPENED;
if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
SStreamBlockScanInfo* pScanInfo = pOperator->info; SStreamScanInfo* pScanInfo = pOperator->info;
pScanInfo->blockType = STREAM_INPUT__DATA_SCAN; pScanInfo->blockType = STREAM_INPUT__DATA_SCAN;
pScanInfo->pSnapshotReadOp->status = OP_OPENED; pScanInfo->pTableScanOp->status = OP_OPENED;
STableScanInfo* pInfo = pScanInfo->pSnapshotReadOp->info; STableScanInfo* pInfo = pScanInfo->pTableScanOp->info;
ASSERT(pInfo->scanMode == TABLE_SCAN__TABLE_ORDER); ASSERT(pInfo->scanMode == TABLE_SCAN__TABLE_ORDER);
if (uid == 0) { if (uid == 0) {
@ -2914,8 +2914,8 @@ int32_t doPrepareScan(SOperatorInfo* pOperator, uint64_t uid, int64_t ts) {
int32_t doGetScanStatus(SOperatorInfo* pOperator, uint64_t* uid, int64_t* ts) { int32_t doGetScanStatus(SOperatorInfo* pOperator, uint64_t* uid, int64_t* ts) {
int32_t type = pOperator->operatorType; int32_t type = pOperator->operatorType;
if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
SStreamBlockScanInfo* pScanInfo = pOperator->info; SStreamScanInfo* pScanInfo = pOperator->info;
STableScanInfo* pSnapShotScanInfo = pScanInfo->pSnapshotReadOp->info; STableScanInfo* pSnapShotScanInfo = pScanInfo->pTableScanOp->info;
*uid = pSnapShotScanInfo->lastStatus.uid; *uid = pSnapShotScanInfo->lastStatus.uid;
*ts = pSnapShotScanInfo->lastStatus.ts; *ts = pSnapShotScanInfo->lastStatus.ts;
} else { } else {
@ -4621,9 +4621,9 @@ static int32_t extractTbscanInStreamOpTree(SOperatorInfo* pOperator, STableScanI
} }
return extractTbscanInStreamOpTree(pOperator->pDownstream[0], ppInfo); return extractTbscanInStreamOpTree(pOperator->pDownstream[0], ppInfo);
} else { } else {
SStreamBlockScanInfo* pInfo = pOperator->info; SStreamScanInfo* pInfo = pOperator->info;
ASSERT(pInfo->pSnapshotReadOp->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN); ASSERT(pInfo->pTableScanOp->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN);
*ppInfo = pInfo->pSnapshotReadOp->info; *ppInfo = pInfo->pTableScanOp->info;
return 0; return 0;
} }
} }

View File

@ -780,7 +780,7 @@ _error:
return NULL; return NULL;
} }
static void doClearBufferedBlocks(SStreamBlockScanInfo* pInfo) { static void doClearBufferedBlocks(SStreamScanInfo* pInfo) {
size_t total = taosArrayGetSize(pInfo->pBlockLists); size_t total = taosArrayGetSize(pInfo->pBlockLists);
pInfo->validBlockIndex = 0; pInfo->validBlockIndex = 0;
@ -791,21 +791,20 @@ static void doClearBufferedBlocks(SStreamBlockScanInfo* pInfo) {
taosArrayClear(pInfo->pBlockLists); taosArrayClear(pInfo->pBlockLists);
} }
static bool isSessionWindow(SStreamBlockScanInfo* pInfo) { static bool isSessionWindow(SStreamScanInfo* pInfo) {
return pInfo->sessionSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION || return pInfo->sessionSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION ||
pInfo->sessionSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION; pInfo->sessionSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION;
} }
static bool isStateWindow(SStreamBlockScanInfo* pInfo) { static bool isStateWindow(SStreamScanInfo* pInfo) {
return pInfo->sessionSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE; return pInfo->sessionSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE;
} }
static bool isIntervalWindow(SStreamBlockScanInfo* pInfo) { static bool isIntervalWindow(SStreamScanInfo* pInfo) {
return pInfo->sessionSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL || return pInfo->sessionSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL ||
pInfo->sessionSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL; pInfo->sessionSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL;
} }
static uint64_t getGroupId(SOperatorInfo* pOperator, uint64_t uid) { static uint64_t getGroupId(SOperatorInfo* pOperator, uint64_t uid) {
uint64_t* groupId = taosHashGet(pOperator->pTaskInfo->tableqinfoList.map, &uid, sizeof(int64_t)); uint64_t* groupId = taosHashGet(pOperator->pTaskInfo->tableqinfoList.map, &uid, sizeof(int64_t));
if (groupId) { if (groupId) {
@ -827,19 +826,17 @@ static uint64_t getGroupId(SOperatorInfo* pOperator, uint64_t uid) {
*/ */
} }
static void setGroupId(SStreamBlockScanInfo* pInfo, SSDataBlock* pBlock, int32_t groupColIndex, int32_t rowIndex) { static void setGroupId(SStreamScanInfo* pInfo, SSDataBlock* pBlock, int32_t groupColIndex, int32_t rowIndex) {
ASSERT(rowIndex < pBlock->info.rows); ASSERT(rowIndex < pBlock->info.rows);
switch (pBlock->info.type) switch (pBlock->info.type) {
{ case STREAM_DELETE_DATA:
case STREAM_DELETE_DATA: case STREAM_RETRIEVE: {
case STREAM_RETRIEVE: { SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, groupColIndex);
SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, groupColIndex); uint64_t* groupCol = (uint64_t*)pColInfo->pData;
uint64_t* groupCol = (uint64_t*)pColInfo->pData; pInfo->groupId = groupCol[rowIndex];
pInfo->groupId = groupCol[rowIndex]; } break;
} default:
break; break;
default:
break;
} }
} }
@ -854,17 +851,17 @@ void resetTableScanInfo(STableScanInfo* pTableScanInfo, STimeWindow* pWin) {
pTableScanInfo->currentGroupId = -1; pTableScanInfo->currentGroupId = -1;
} }
static bool prepareRangeScan(SStreamBlockScanInfo* pInfo, SSDataBlock* pBlock, int32_t* pRowIndex) { static bool prepareRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pBlock, int32_t* pRowIndex) {
if ((*pRowIndex) == pBlock->info.rows) { if ((*pRowIndex) == pBlock->info.rows) {
return false; return false;
} }
ASSERT(taosArrayGetSize(pBlock->pDataBlock) >= 3); ASSERT(taosArrayGetSize(pBlock->pDataBlock) >= 3);
SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX); SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
TSKEY* startData = (TSKEY*)pStartTsCol->pData; TSKEY* startData = (TSKEY*)pStartTsCol->pData;
SColumnInfoData* pEndTsCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX); SColumnInfoData* pEndTsCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
TSKEY* endData = (TSKEY*)pEndTsCol->pData; TSKEY* endData = (TSKEY*)pEndTsCol->pData;
STimeWindow win = {.skey = startData[*pRowIndex], .ekey = endData[*pRowIndex]}; STimeWindow win = {.skey = startData[*pRowIndex], .ekey = endData[*pRowIndex]};
setGroupId(pInfo, pBlock, GROUPID_COLUMN_INDEX, *pRowIndex); setGroupId(pInfo, pBlock, GROUPID_COLUMN_INDEX, *pRowIndex);
(*pRowIndex)++; (*pRowIndex)++;
@ -877,16 +874,16 @@ static bool prepareRangeScan(SStreamBlockScanInfo* pInfo, SSDataBlock* pBlock, i
win.skey = TMIN(win.skey, startData[*pRowIndex]); win.skey = TMIN(win.skey, startData[*pRowIndex]);
continue; continue;
} }
ASSERT( (win.skey > startData[*pRowIndex] && win.ekey < endData[*pRowIndex]) || ASSERT((win.skey > startData[*pRowIndex] && win.ekey < endData[*pRowIndex]) ||
( isInTimeWindow(&win, startData[*pRowIndex], 0) || isInTimeWindow(&win, endData[*pRowIndex], 0) ) ); (isInTimeWindow(&win, startData[*pRowIndex], 0) || isInTimeWindow(&win, endData[*pRowIndex], 0)));
break; break;
} }
resetTableScanInfo(pInfo->pSnapshotReadOp->info, &win); resetTableScanInfo(pInfo->pTableScanOp->info, &win);
return true; return true;
} }
static bool prepareDataScan(SStreamBlockScanInfo* pInfo, SSDataBlock* pSDB, int32_t tsColIndex, int32_t* pRowIndex) { static bool prepareDataScan(SStreamScanInfo* pInfo, SSDataBlock* pSDB, int32_t tsColIndex, int32_t* pRowIndex) {
STimeWindow win = { STimeWindow win = {
.skey = INT64_MIN, .skey = INT64_MIN,
.ekey = INT64_MAX, .ekey = INT64_MAX,
@ -907,11 +904,10 @@ static bool prepareDataScan(SStreamBlockScanInfo* pInfo, SSDataBlock* pSDB, int3
setGroupId(pInfo, pSDB, GROUPID_COLUMN_INDEX, *pRowIndex); setGroupId(pInfo, pSDB, GROUPID_COLUMN_INDEX, *pRowIndex);
(*pRowIndex) += updateSessionWindowInfo(pCurWin, tsCols, NULL, pSDB->info.rows, *pRowIndex, gap, NULL); (*pRowIndex) += updateSessionWindowInfo(pCurWin, tsCols, NULL, pSDB->info.rows, *pRowIndex, gap, NULL);
} else { } else {
win = win = getActiveTimeWindow(NULL, &dumyInfo, tsCols[*pRowIndex], &pInfo->interval, pInfo->interval.precision, NULL);
getActiveTimeWindow(NULL, &dumyInfo, tsCols[*pRowIndex], &pInfo->interval, pInfo->interval.precision, NULL);
setGroupId(pInfo, pSDB, GROUPID_COLUMN_INDEX, *pRowIndex); setGroupId(pInfo, pSDB, GROUPID_COLUMN_INDEX, *pRowIndex);
(*pRowIndex) += getNumOfRowsInTimeWindow(&pSDB->info, tsCols, *pRowIndex, win.ekey, binarySearchForKey, NULL, (*pRowIndex) +=
TSDB_ORDER_ASC); getNumOfRowsInTimeWindow(&pSDB->info, tsCols, *pRowIndex, win.ekey, binarySearchForKey, NULL, TSDB_ORDER_ASC);
} }
needRead = true; needRead = true;
} else if (isStateWindow(pInfo)) { } else if (isStateWindow(pInfo)) {
@ -929,7 +925,7 @@ static bool prepareDataScan(SStreamBlockScanInfo* pInfo, SSDataBlock* pSDB, int3
if (!needRead) { if (!needRead) {
return false; return false;
} }
resetTableScanInfo(pInfo->pSnapshotReadOp->info, &win); resetTableScanInfo(pInfo->pTableScanOp->info, &win);
return true; return true;
} }
@ -946,13 +942,13 @@ static void copyOneRow(SSDataBlock* dest, SSDataBlock* source, int32_t sourceRow
dest->info.rows++; dest->info.rows++;
} }
static SSDataBlock* doRangeScan(SStreamBlockScanInfo* pInfo, SSDataBlock* pSDB, int32_t tsColIndex, int32_t* pRowIndex) { static SSDataBlock* doRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pSDB, int32_t tsColIndex, int32_t* pRowIndex) {
while (1) { while (1) {
SSDataBlock* pResult = NULL; SSDataBlock* pResult = NULL;
pResult = doTableScan(pInfo->pSnapshotReadOp); pResult = doTableScan(pInfo->pTableScanOp);
if (!pResult && prepareRangeScan(pInfo, pSDB, pRowIndex)) { if (!pResult && prepareRangeScan(pInfo, pSDB, pRowIndex)) {
// scan next window data // scan next window data
pResult = doTableScan(pInfo->pSnapshotReadOp); pResult = doTableScan(pInfo->pTableScanOp);
} }
if (!pResult) { if (!pResult) {
blockDataCleanup(pSDB); blockDataCleanup(pSDB);
@ -966,14 +962,14 @@ static SSDataBlock* doRangeScan(SStreamBlockScanInfo* pInfo, SSDataBlock* pSDB,
} }
} }
static SSDataBlock* doDataScan(SStreamBlockScanInfo* pInfo, SSDataBlock* pSDB, int32_t tsColIndex, int32_t* pRowIndex) { static SSDataBlock* doDataScan(SStreamScanInfo* pInfo, SSDataBlock* pSDB, int32_t tsColIndex, int32_t* pRowIndex) {
while (1) { while (1) {
SSDataBlock* pResult = NULL; SSDataBlock* pResult = NULL;
pResult = doTableScan(pInfo->pSnapshotReadOp); pResult = doTableScan(pInfo->pTableScanOp);
if (pResult == NULL) { if (pResult == NULL) {
if (prepareDataScan(pInfo, pSDB, tsColIndex, pRowIndex)) { if (prepareDataScan(pInfo, pSDB, tsColIndex, pRowIndex)) {
// scan next window data // scan next window data
pResult = doTableScan(pInfo->pSnapshotReadOp); pResult = doTableScan(pInfo->pTableScanOp);
} }
} }
if (!pResult) { if (!pResult) {
@ -997,8 +993,8 @@ static SSDataBlock* doDataScan(SStreamBlockScanInfo* pInfo, SSDataBlock* pSDB, i
return pResult; return pResult;
*/ */
} }
static void generateIntervalTs(SStreamScanInfo* pInfo, SSDataBlock* pDelBlock, SOperatorInfo* pOperator,
static void generateIntervalTs(SStreamBlockScanInfo* pInfo, SSDataBlock* pDelBlock, SOperatorInfo* pOperator, SSDataBlock* pUpdateRes) { SSDataBlock* pUpdateRes) {
if (pDelBlock->info.rows == 0) { if (pDelBlock->info.rows == 0) {
return; return;
} }
@ -1006,18 +1002,20 @@ static void generateIntervalTs(SStreamBlockScanInfo* pInfo, SSDataBlock* pDelBlo
blockDataEnsureCapacity(pUpdateRes, 64); blockDataEnsureCapacity(pUpdateRes, 64);
ASSERT(taosArrayGetSize(pDelBlock->pDataBlock) >= 3); ASSERT(taosArrayGetSize(pDelBlock->pDataBlock) >= 3);
SColumnInfoData* pStartTsCol = taosArrayGet(pDelBlock->pDataBlock, START_TS_COLUMN_INDEX); SColumnInfoData* pStartTsCol = taosArrayGet(pDelBlock->pDataBlock, START_TS_COLUMN_INDEX);
TSKEY* startData = (TSKEY*)pStartTsCol->pData; TSKEY* startData = (TSKEY*)pStartTsCol->pData;
SColumnInfoData* pEndTsCol = taosArrayGet(pDelBlock->pDataBlock, END_TS_COLUMN_INDEX); SColumnInfoData* pEndTsCol = taosArrayGet(pDelBlock->pDataBlock, END_TS_COLUMN_INDEX);
TSKEY* endData = (TSKEY*)pEndTsCol->pData; TSKEY* endData = (TSKEY*)pEndTsCol->pData;
SColumnInfoData* pGpCol = taosArrayGet(pDelBlock->pDataBlock, UID_COLUMN_INDEX); SColumnInfoData* pGpCol = taosArrayGet(pDelBlock->pDataBlock, UID_COLUMN_INDEX);
uint64_t* uidCol = (uint64_t*)pGpCol->pData; uint64_t* uidCol = (uint64_t*)pGpCol->pData;
SColumnInfoData* pDestTsCol = taosArrayGet(pUpdateRes->pDataBlock, START_TS_COLUMN_INDEX); SColumnInfoData* pDestTsCol = taosArrayGet(pUpdateRes->pDataBlock, START_TS_COLUMN_INDEX);
SColumnInfoData* pDestGpCol = taosArrayGet(pUpdateRes->pDataBlock, GROUPID_COLUMN_INDEX); SColumnInfoData* pDestGpCol = taosArrayGet(pUpdateRes->pDataBlock, GROUPID_COLUMN_INDEX);
for (int32_t i = pInfo->deleteDataIndex ; i < pDelBlock->info.rows && for (int32_t i = pInfo->deleteDataIndex;
i < pDelBlock->info.capacity - (endData[i] - startData[i])/pInfo->interval.interval - 1; i++) { i < pDelBlock->info.rows &&
i < pDelBlock->info.capacity - (endData[i] - startData[i]) / pInfo->interval.interval - 1;
i++) {
uint64_t groupId = getGroupId(pOperator, uidCol[i]); uint64_t groupId = getGroupId(pOperator, uidCol[i]);
for (TSKEY startTs = startData[i]; startTs <= endData[i]; ) { for (TSKEY startTs = startData[i]; startTs <= endData[i];) {
colDataAppend(pDestTsCol, pUpdateRes->info.rows, (const char*)&startTs, false); colDataAppend(pDestTsCol, pUpdateRes->info.rows, (const char*)&startTs, false);
colDataAppend(pDestGpCol, pUpdateRes->info.rows, (const char*)&groupId, false); colDataAppend(pDestGpCol, pUpdateRes->info.rows, (const char*)&groupId, false);
pUpdateRes->info.rows++; pUpdateRes->info.rows++;
@ -1032,33 +1030,36 @@ static void generateIntervalTs(SStreamBlockScanInfo* pInfo, SSDataBlock* pDelBlo
} }
} }
static void generateScanRange(SStreamBlockScanInfo* pInfo, SSDataBlock* pBlock, SOperatorInfo* pOperator, SSDataBlock* pUpdateRes) { static void generateScanRange(SStreamScanInfo* pInfo, SSDataBlock* pBlock, SOperatorInfo* pOperator,
if (pBlock->info.rows == 0) { SSDataBlock* pUpdateRes) {
if (pBlock->info.rows == 0) {
return; return;
} }
blockDataCleanup(pUpdateRes); blockDataCleanup(pUpdateRes);
blockDataEnsureCapacity(pUpdateRes, pBlock->info.rows); blockDataEnsureCapacity(pUpdateRes, pBlock->info.rows);
ASSERT(taosArrayGetSize(pBlock->pDataBlock) >= 3); ASSERT(taosArrayGetSize(pBlock->pDataBlock) >= 3);
SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX); SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
TSKEY* startData = (TSKEY*)pStartTsCol->pData; TSKEY* startData = (TSKEY*)pStartTsCol->pData;
SColumnInfoData* pEndTsCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX); SColumnInfoData* pEndTsCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
TSKEY* endData = (TSKEY*)pEndTsCol->pData; TSKEY* endData = (TSKEY*)pEndTsCol->pData;
SColumnInfoData* pGpCol = taosArrayGet(pBlock->pDataBlock, UID_COLUMN_INDEX); SColumnInfoData* pGpCol = taosArrayGet(pBlock->pDataBlock, UID_COLUMN_INDEX);
uint64_t* uidCol = (uint64_t*)pGpCol->pData; uint64_t* uidCol = (uint64_t*)pGpCol->pData;
SColumnInfoData* pDestStartCol = taosArrayGet(pUpdateRes->pDataBlock, START_TS_COLUMN_INDEX); SColumnInfoData* pDestStartCol = taosArrayGet(pUpdateRes->pDataBlock, START_TS_COLUMN_INDEX);
SColumnInfoData* pDestEndCol = taosArrayGet(pUpdateRes->pDataBlock, END_TS_COLUMN_INDEX); SColumnInfoData* pDestEndCol = taosArrayGet(pUpdateRes->pDataBlock, END_TS_COLUMN_INDEX);
SColumnInfoData* pDestGpCol = taosArrayGet(pUpdateRes->pDataBlock, GROUPID_COLUMN_INDEX); SColumnInfoData* pDestGpCol = taosArrayGet(pUpdateRes->pDataBlock, GROUPID_COLUMN_INDEX);
int32_t dummy = 0; int32_t dummy = 0;
for (int32_t i = 0 ; i < pBlock->info.rows; i++) { for (int32_t i = 0; i < pBlock->info.rows; i++) {
uint64_t groupId = getGroupId(pOperator, uidCol[i]); uint64_t groupId = getGroupId(pOperator, uidCol[i]);
//gap must be 0. // gap must be 0.
SResultWindowInfo* pStartWin = getCurSessionWindow(pInfo->sessionSup.pStreamAggSup, startData[i], endData[i], groupId, 0, &dummy); SResultWindowInfo* pStartWin =
getCurSessionWindow(pInfo->sessionSup.pStreamAggSup, startData[i], endData[i], groupId, 0, &dummy);
if (!pStartWin) { if (!pStartWin) {
// window has been closed. // window has been closed.
continue; continue;
} }
SResultWindowInfo* pEndWin = getCurSessionWindow(pInfo->sessionSup.pStreamAggSup, endData[i], endData[i], groupId, 0, &dummy); SResultWindowInfo* pEndWin =
getCurSessionWindow(pInfo->sessionSup.pStreamAggSup, endData[i], endData[i], groupId, 0, &dummy);
ASSERT(pEndWin); ASSERT(pEndWin);
colDataAppend(pDestStartCol, i, (const char*)&pStartWin->win.skey, false); colDataAppend(pDestStartCol, i, (const char*)&pStartWin->win.skey, false);
colDataAppend(pDestEndCol, i, (const char*)&pEndWin->win.ekey, false); colDataAppend(pDestEndCol, i, (const char*)&pEndWin->win.ekey, false);
@ -1066,7 +1067,7 @@ static void generateScanRange(SStreamBlockScanInfo* pInfo, SSDataBlock* pBlock,
pUpdateRes->info.rows++; pUpdateRes->info.rows++;
} }
} }
static void setUpdateData(SStreamBlockScanInfo* pInfo, SSDataBlock* pBlock, SSDataBlock* pUpdateBlock) { static void setUpdateData(SStreamScanInfo* pInfo, SSDataBlock* pBlock, SSDataBlock* pUpdateBlock) {
blockDataCleanup(pUpdateBlock); blockDataCleanup(pUpdateBlock);
int32_t size = taosArrayGetSize(pInfo->tsArray); int32_t size = taosArrayGetSize(pInfo->tsArray);
if (pInfo->tsArrayIndex < size) { if (pInfo->tsArrayIndex < size) {
@ -1075,11 +1076,11 @@ static void setUpdateData(SStreamBlockScanInfo* pInfo, SSDataBlock* pBlock, SSDa
blockDataEnsureCapacity(pUpdateBlock, size); blockDataEnsureCapacity(pUpdateBlock, size);
int32_t rowId = *(int32_t*)taosArrayGet(pInfo->tsArray, pInfo->tsArrayIndex); int32_t rowId = *(int32_t*)taosArrayGet(pInfo->tsArray, pInfo->tsArrayIndex);
pInfo->groupId = getGroupId(pInfo->pSnapshotReadOp, pBlock->info.uid); pInfo->groupId = getGroupId(pInfo->pTableScanOp, pBlock->info.uid);
int32_t i = 0; int32_t i = 0;
for (; i < size; i++) { for (; i < size; i++) {
rowId = *(int32_t*)taosArrayGet(pInfo->tsArray, i + pInfo->tsArrayIndex); rowId = *(int32_t*)taosArrayGet(pInfo->tsArray, i + pInfo->tsArrayIndex);
uint64_t id = getGroupId(pInfo->pSnapshotReadOp, pBlock->info.uid); uint64_t id = getGroupId(pInfo->pTableScanOp, pBlock->info.uid);
if (pInfo->groupId != id) { if (pInfo->groupId != id) {
break; break;
} }
@ -1098,12 +1099,11 @@ static void setUpdateData(SStreamBlockScanInfo* pInfo, SSDataBlock* pBlock, SSDa
} }
if (size == 0) { if (size == 0) {
generateIntervalTs(pInfo, pInfo->pDeleteDataRes, pInfo->pSnapshotReadOp, pUpdateBlock); generateIntervalTs(pInfo, pInfo->pDeleteDataRes, pInfo->pTableScanOp, pUpdateBlock);
} }
} }
static void checkUpdateData(SStreamBlockScanInfo* pInfo, bool invertible, SSDataBlock* pBlock, static void checkUpdateData(SStreamScanInfo* pInfo, bool invertible, SSDataBlock* pBlock, bool out) {
bool out) {
SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex); SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex);
ASSERT(pColDataInfo->info.type == TSDB_DATA_TYPE_TIMESTAMP); ASSERT(pColDataInfo->info.type == TSDB_DATA_TYPE_TIMESTAMP);
TSKEY* ts = (TSKEY*)pColDataInfo->pData; TSKEY* ts = (TSKEY*)pColDataInfo->pData;
@ -1119,15 +1119,15 @@ static void setBlockGroupId(SOperatorInfo* pOperator, SSDataBlock* pBlock, int32
SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, uidColIndex); SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, uidColIndex);
uint64_t* uidCol = (uint64_t*)pColDataInfo->pData; uint64_t* uidCol = (uint64_t*)pColDataInfo->pData;
ASSERT(pBlock->info.rows > 0); ASSERT(pBlock->info.rows > 0);
for (int32_t i = 0 ; i < pBlock->info.rows; i++) { for (int32_t i = 0; i < pBlock->info.rows; i++) {
uidCol[i] = getGroupId(pOperator, uidCol[i]); uidCol[i] = getGroupId(pOperator, uidCol[i]);
} }
} }
static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) { static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
// NOTE: this operator does never check if current status is done or not // NOTE: this operator does never check if current status is done or not
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SStreamBlockScanInfo* pInfo = pOperator->info; SStreamScanInfo* pInfo = pOperator->info;
pTaskInfo->code = pOperator->fpSet._openFn(pOperator); pTaskInfo->code = pOperator->fpSet._openFn(pOperator);
if (pTaskInfo->code != TSDB_CODE_SUCCESS || pOperator->status == OP_EXEC_DONE) { if (pTaskInfo->code != TSDB_CODE_SUCCESS || pOperator->status == OP_EXEC_DONE) {
@ -1145,36 +1145,35 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
int32_t current = pInfo->validBlockIndex++; int32_t current = pInfo->validBlockIndex++;
SSDataBlock* pBlock = taosArrayGetP(pInfo->pBlockLists, current); SSDataBlock* pBlock = taosArrayGetP(pInfo->pBlockLists, current);
// TODO move into scan
blockDataUpdateTsWindow(pBlock, 0); blockDataUpdateTsWindow(pBlock, 0);
switch (pBlock->info.type) { switch (pBlock->info.type) {
case STREAM_RETRIEVE:{ case STREAM_RETRIEVE: {
pInfo->blockType = STREAM_INPUT__DATA_SUBMIT; pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RETRIEVE; pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RETRIEVE;
copyDataBlock(pInfo->pPullDataRes, pBlock); copyDataBlock(pInfo->pPullDataRes, pBlock);
pInfo->pullDataResIndex = 0; pInfo->pullDataResIndex = 0;
prepareDataScan(pInfo, pInfo->pPullDataRes, START_TS_COLUMN_INDEX, &pInfo->pullDataResIndex); prepareDataScan(pInfo, pInfo->pPullDataRes, START_TS_COLUMN_INDEX, &pInfo->pullDataResIndex);
updateInfoAddCloseWindowSBF(pInfo->pUpdateInfo); updateInfoAddCloseWindowSBF(pInfo->pUpdateInfo);
} } break;
break; case STREAM_DELETE_DATA: {
case STREAM_DELETE_DATA: { pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
pInfo->blockType = STREAM_INPUT__DATA_SUBMIT; pInfo->updateResIndex = 0;
pInfo->updateResIndex = 0; if (isIntervalWindow(pInfo)) {
if (isIntervalWindow(pInfo)) { copyDataBlock(pInfo->pDeleteDataRes, pBlock);
copyDataBlock(pInfo->pDeleteDataRes, pBlock); generateIntervalTs(pInfo, pInfo->pDeleteDataRes, pInfo->pTableScanOp, pInfo->pUpdateRes);
generateIntervalTs(pInfo, pInfo->pDeleteDataRes, pInfo->pSnapshotReadOp, pInfo->pUpdateRes); prepareDataScan(pInfo, pInfo->pUpdateRes, START_TS_COLUMN_INDEX, &pInfo->updateResIndex);
prepareDataScan(pInfo, pInfo->pUpdateRes, START_TS_COLUMN_INDEX, &pInfo->updateResIndex); pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER;
pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER; } else {
} else { generateScanRange(pInfo, pBlock, pInfo->pTableScanOp, pInfo->pUpdateRes);
generateScanRange(pInfo, pBlock, pInfo->pSnapshotReadOp, pInfo->pUpdateRes); prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex); pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE; }
} pInfo->pUpdateRes->info.type = STREAM_DELETE_DATA;
pInfo->pUpdateRes->info.type = STREAM_DELETE_DATA; return pInfo->pUpdateRes;
return pInfo->pUpdateRes; } break;
} default:
break; break;
default:
break;
} }
return pBlock; return pBlock;
} else if (pInfo->blockType == STREAM_INPUT__DATA_SUBMIT) { } else if (pInfo->blockType == STREAM_INPUT__DATA_SUBMIT) {
@ -1233,11 +1232,11 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
SDataBlockInfo* pBlockInfo = &pInfo->pRes->info; SDataBlockInfo* pBlockInfo = &pInfo->pRes->info;
blockDataCleanup(pInfo->pRes); blockDataCleanup(pInfo->pRes);
while (tqNextDataBlock(pInfo->streamBlockReader)) { while (tqNextDataBlock(pInfo->streamReader)) {
SSDataBlock block = {0}; SSDataBlock block = {0};
// todo refactor // todo refactor
int32_t code = tqRetrieveDataBlock(&block, pInfo->streamBlockReader); int32_t code = tqRetrieveDataBlock(&block, pInfo->streamReader);
uint64_t groupId = block.info.groupId; uint64_t groupId = block.info.groupId;
uint64_t uid = block.info.uid; uint64_t uid = block.info.uid;
@ -1333,11 +1332,13 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
} }
} }
} }
return (pBlockInfo->rows == 0) ? NULL : pInfo->pRes; return (pBlockInfo->rows == 0) ? NULL : pInfo->pRes;
} else if (pInfo->blockType == STREAM_INPUT__DATA_SCAN) { } else if (pInfo->blockType == STREAM_INPUT__DATA_SCAN) {
// check reader last status // check reader last status
// if not match, reset status // if not match, reset status
SSDataBlock* pResult = doTableScan(pInfo->pSnapshotReadOp); SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp);
return pResult && pResult->info.rows > 0 ? pResult : NULL; return pResult && pResult->info.rows > 0 ? pResult : NULL;
} else { } else {
@ -1361,8 +1362,8 @@ static SArray* extractTableIdList(const STableListInfo* pTableGroupInfo) {
SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* pTableScanNode, SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* pTableScanNode,
SExecTaskInfo* pTaskInfo, STimeWindowAggSupp* pTwSup, uint64_t queryId, SExecTaskInfo* pTaskInfo, STimeWindowAggSupp* pTwSup, uint64_t queryId,
uint64_t taskId) { uint64_t taskId) {
SStreamBlockScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamBlockScanInfo)); SStreamScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamScanInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) { if (pInfo == NULL || pOperator == NULL) {
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY; terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
@ -1400,13 +1401,25 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
} }
if (pHandle) { if (pHandle) {
SOperatorInfo* pTableScanDummy = createTableScanOperatorInfo(pTableScanNode, pHandle, pTaskInfo); SOperatorInfo* pTableScanOp = createTableScanOperatorInfo(pTableScanNode, pHandle, pTaskInfo);
STableScanInfo* pSTInfo = (STableScanInfo*)pTableScanDummy->info; STableScanInfo* pSTInfo = (STableScanInfo*)pTableScanOp->info;
SArray* tableList = taosArrayGetP(pTaskInfo->tableqinfoList.pGroupList, 0); SArray* tableList = taosArrayGetP(pTaskInfo->tableqinfoList.pGroupList, 0);
if (pHandle->tqReader) { if (pHandle->initTableReader) {
pSTInfo->scanMode = TABLE_SCAN__TABLE_ORDER; pSTInfo->scanMode = TABLE_SCAN__TABLE_ORDER;
tsdbReaderOpen(pHandle->vnode, &pSTInfo->cond, tableList, &pSTInfo->dataReader, 0); pSTInfo->dataReader = NULL;
if (tsdbReaderOpen(pHandle->vnode, &pSTInfo->cond, tableList, &pSTInfo->dataReader, NULL) < 0) {
ASSERT(0);
}
}
if (pHandle->initStreamReader) {
ASSERT(pHandle->streamReader == NULL);
pInfo->streamReader = tqInitSubmitMsgScanner(pHandle->meta);
ASSERT(pInfo->streamReader);
} else {
ASSERT(pHandle->streamReader);
pInfo->streamReader = pHandle->streamReader;
} }
if (pSTInfo->interval.interval > 0) { if (pSTInfo->interval.interval > 0) {
@ -1414,18 +1427,17 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
} else { } else {
pInfo->pUpdateInfo = NULL; pInfo->pUpdateInfo = NULL;
} }
pInfo->pSnapshotReadOp = pTableScanDummy;
pInfo->pTableScanOp = pTableScanOp;
pInfo->interval = pSTInfo->interval; pInfo->interval = pSTInfo->interval;
pInfo->readHandle = *pHandle; pInfo->readHandle = *pHandle;
ASSERT(pHandle->reader);
pInfo->streamBlockReader = pHandle->reader;
pInfo->tableUid = pScanPhyNode->uid; pInfo->tableUid = pScanPhyNode->uid;
// set the extract column id to streamHandle // set the extract column id to streamHandle
tqReadHandleSetColIdList((SStreamReader*)pHandle->reader, pColIds); tqReadHandleSetColIdList(pInfo->streamReader, pColIds);
SArray* tableIdList = extractTableIdList(&pTaskInfo->tableqinfoList); SArray* tableIdList = extractTableIdList(&pTaskInfo->tableqinfoList);
int32_t code = tqReadHandleSetTbUidList(pHandle->reader, tableIdList); int32_t code = tqReadHandleSetTbUidList(pInfo->streamReader, tableIdList);
if (code != 0) { if (code != 0) {
taosArrayDestroy(tableIdList); taosArrayDestroy(tableIdList);
goto _error; goto _error;
@ -1449,7 +1461,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
pInfo->deleteDataIndex = 0; pInfo->deleteDataIndex = 0;
pInfo->pDeleteDataRes = createPullDataBlock(); pInfo->pDeleteDataRes = createPullDataBlock();
pOperator->name = "StreamBlockScanOperator"; pOperator->name = "StreamScanOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN;
pOperator->blocking = false; pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED; pOperator->status = OP_NOT_OPENED;
@ -1458,7 +1470,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
pOperator->pTaskInfo = pTaskInfo; pOperator->pTaskInfo = pTaskInfo;
pOperator->fpSet = pOperator->fpSet =
createOperatorFpSet(operatorDummyOpenFn, doStreamBlockScan, NULL, NULL, operatorDummyCloseFn, NULL, NULL, NULL); createOperatorFpSet(operatorDummyOpenFn, doStreamScan, NULL, NULL, operatorDummyCloseFn, NULL, NULL, NULL);
return pOperator; return pOperator;

View File

@ -3023,7 +3023,7 @@ void initDummyFunction(SqlFunctionCtx* pDummy, SqlFunctionCtx* pCtx, int32_t num
void initDownStream(SOperatorInfo* downstream, SStreamAggSupporter* pAggSup, int64_t gap, int64_t waterMark, void initDownStream(SOperatorInfo* downstream, SStreamAggSupporter* pAggSup, int64_t gap, int64_t waterMark,
uint8_t type) { uint8_t type) {
ASSERT(downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN); ASSERT(downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN);
SStreamBlockScanInfo* pScanInfo = downstream->info; SStreamScanInfo* pScanInfo = downstream->info;
pScanInfo->sessionSup = (SessionWindowSupporter){.pStreamAggSup = pAggSup, .gap = gap, .parentType = type}; pScanInfo->sessionSup = (SessionWindowSupporter){.pStreamAggSup = pAggSup, .gap = gap, .parentType = type};
pScanInfo->pUpdateInfo = updateInfoInit(60000, TSDB_TIME_PRECISION_MILLI, waterMark); pScanInfo->pUpdateInfo = updateInfoInit(60000, TSDB_TIME_PRECISION_MILLI, waterMark);
} }
@ -3580,8 +3580,8 @@ typedef SResultWindowInfo* (*__get_win_info_)(void*);
SResultWindowInfo* getResWinForSession(void* pData) { return (SResultWindowInfo*)pData; } SResultWindowInfo* getResWinForSession(void* pData) { return (SResultWindowInfo*)pData; }
SResultWindowInfo* getResWinForState(void* pData) { return &((SStateWindowInfo*)pData)->winInfo; } SResultWindowInfo* getResWinForState(void* pData) { return &((SStateWindowInfo*)pData)->winInfo; }
int32_t closeSessionWindow(SHashObj* pHashMap, STimeWindowAggSupp* pTwSup, int32_t closeSessionWindow(SHashObj* pHashMap, STimeWindowAggSupp* pTwSup, SArray* pClosed, __get_win_info_ fn,
SArray* pClosed, __get_win_info_ fn, bool delete) { bool delete) {
// Todo(liuyao) save window to tdb // Todo(liuyao) save window to tdb
void** pIte = NULL; void** pIte = NULL;
size_t keyLen = 0; size_t keyLen = 0;
@ -3743,8 +3743,8 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
// restore the value // restore the value
pOperator->status = OP_RES_TO_RETURN; pOperator->status = OP_RES_TO_RETURN;
closeSessionWindow(pInfo->streamAggSup.pResultRows, &pInfo->twAggSup, pUpdated, closeSessionWindow(pInfo->streamAggSup.pResultRows, &pInfo->twAggSup, pUpdated, getResWinForSession,
getResWinForSession, pInfo->ignoreExpiredData); pInfo->ignoreExpiredData);
closeChildSessionWindow(pInfo->pChildren, pInfo->twAggSup.maxTs, pInfo->ignoreExpiredData); closeChildSessionWindow(pInfo->pChildren, pInfo->twAggSup.maxTs, pInfo->ignoreExpiredData);
copyUpdateResult(pStUpdated, pUpdated); copyUpdateResult(pStUpdated, pUpdated);
taosHashCleanup(pStUpdated); taosHashCleanup(pStUpdated);
@ -4245,8 +4245,8 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
// restore the value // restore the value
pOperator->status = OP_RES_TO_RETURN; pOperator->status = OP_RES_TO_RETURN;
closeSessionWindow(pInfo->streamAggSup.pResultRows, &pInfo->twAggSup, pUpdated, closeSessionWindow(pInfo->streamAggSup.pResultRows, &pInfo->twAggSup, pUpdated, getResWinForState,
getResWinForState, pInfo->ignoreExpiredData); pInfo->ignoreExpiredData);
closeChildSessionWindow(pInfo->pChildren, pInfo->twAggSup.maxTs, pInfo->ignoreExpiredData); closeChildSessionWindow(pInfo->pChildren, pInfo->twAggSup.maxTs, pInfo->ignoreExpiredData);
copyUpdateResult(pSeUpdated, pUpdated); copyUpdateResult(pSeUpdated, pUpdated);
taosHashCleanup(pSeUpdated); taosHashCleanup(pSeUpdated);

View File

@ -17,6 +17,7 @@
#define _STREAM_INC_H_ #define _STREAM_INC_H_
#include "executor.h" #include "executor.h"
#include "tref.h"
#include "tstream.h" #include "tstream.h"
#ifdef __cplusplus #ifdef __cplusplus
@ -24,8 +25,9 @@ extern "C" {
#endif #endif
typedef struct { typedef struct {
int8_t inited; int8_t inited;
void* timer; int32_t refPool;
void* timer;
} SStreamGlobalEnv; } SStreamGlobalEnv;
static SStreamGlobalEnv streamEnv; static SStreamGlobalEnv streamEnv;

View File

@ -76,9 +76,6 @@ void streamTriggerByTimer(void* param, void* tmrId) {
int32_t streamSetupTrigger(SStreamTask* pTask) { int32_t streamSetupTrigger(SStreamTask* pTask) {
if (pTask->triggerParam != 0) { if (pTask->triggerParam != 0) {
if (streamInit() < 0) {
return -1;
}
pTask->timer = taosTmrStart(streamTriggerByTimer, (int32_t)pTask->triggerParam, pTask, streamEnv.timer); pTask->timer = taosTmrStart(streamTriggerByTimer, (int32_t)pTask->triggerParam, pTask, streamEnv.timer);
pTask->triggerStatus = TASK_TRIGGER_STATUS__IN_ACTIVE; pTask->triggerStatus = TASK_TRIGGER_STATUS__IN_ACTIVE;
} }

View File

@ -32,8 +32,8 @@ typedef struct SSyncLogStoreData {
SSyncNode* pSyncNode; SSyncNode* pSyncNode;
SWal* pWal; SWal* pWal;
TdThreadMutex mutex; TdThreadMutex mutex;
SWalReadHandle* pWalHandle; SWalReader* pWalHandle;
// SyncIndex beginIndex; // valid begin index, default 0, may be set beginIndex > 0 // SyncIndex beginIndex; // valid begin index, default 0, may be set beginIndex > 0
} SSyncLogStoreData; } SSyncLogStoreData;

View File

@ -62,7 +62,7 @@ SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode) {
ASSERT(pData->pWal != NULL); ASSERT(pData->pWal != NULL);
taosThreadMutexInit(&(pData->mutex), NULL); taosThreadMutexInit(&(pData->mutex), NULL);
pData->pWalHandle = walOpenReadHandle(pData->pWal); pData->pWalHandle = walOpenReader(pData->pWal, NULL);
ASSERT(pData->pWalHandle != NULL); ASSERT(pData->pWalHandle != NULL);
pLogStore->appendEntry = logStoreAppendEntry; pLogStore->appendEntry = logStoreAppendEntry;
@ -95,7 +95,7 @@ void logStoreDestory(SSyncLogStore* pLogStore) {
taosThreadMutexLock(&(pData->mutex)); taosThreadMutexLock(&(pData->mutex));
if (pData->pWalHandle != NULL) { if (pData->pWalHandle != NULL) {
walCloseReadHandle(pData->pWalHandle); walCloseReader(pData->pWalHandle);
pData->pWalHandle = NULL; pData->pWalHandle = NULL;
} }
taosThreadMutexUnlock(&(pData->mutex)); taosThreadMutexUnlock(&(pData->mutex));
@ -255,7 +255,7 @@ static int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index,
*ppEntry = NULL; *ppEntry = NULL;
// SWalReadHandle* pWalHandle = walOpenReadHandle(pWal); // SWalReadHandle* pWalHandle = walOpenReadHandle(pWal);
SWalReadHandle* pWalHandle = pData->pWalHandle; SWalReader* pWalHandle = pData->pWalHandle;
if (pWalHandle == NULL) { if (pWalHandle == NULL) {
terrno = TSDB_CODE_SYN_INTERNAL_ERROR; terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
return -1; return -1;
@ -263,7 +263,7 @@ static int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index,
taosThreadMutexLock(&(pData->mutex)); taosThreadMutexLock(&(pData->mutex));
code = walReadWithHandle(pWalHandle, index); code = walReadVer(pWalHandle, index);
if (code != 0) { if (code != 0) {
int32_t err = terrno; int32_t err = terrno;
const char* errStr = tstrerror(err); const char* errStr = tstrerror(err);
@ -398,10 +398,10 @@ SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) {
taosThreadMutexLock(&(pData->mutex)); taosThreadMutexLock(&(pData->mutex));
// SWalReadHandle* pWalHandle = walOpenReadHandle(pWal); // SWalReadHandle* pWalHandle = walOpenReadHandle(pWal);
SWalReadHandle* pWalHandle = pData->pWalHandle; SWalReader* pWalHandle = pData->pWalHandle;
ASSERT(pWalHandle != NULL); ASSERT(pWalHandle != NULL);
int32_t code = walReadWithHandle(pWalHandle, index); int32_t code = walReadVer(pWalHandle, index);
if (code != 0) { if (code != 0) {
int32_t err = terrno; int32_t err = terrno;
const char* errStr = tstrerror(err); const char* errStr = tstrerror(err);

View File

@ -19,6 +19,7 @@
#include "taoserror.h" #include "taoserror.h"
#include "tchecksum.h" #include "tchecksum.h"
#include "tcoding.h" #include "tcoding.h"
#include "tcommon.h"
#include "tcompare.h" #include "tcompare.h"
#include "wal.h" #include "wal.h"

View File

@ -16,20 +16,29 @@
#include "taoserror.h" #include "taoserror.h"
#include "walInt.h" #include "walInt.h"
SWalReadHandle *walOpenReadHandle(SWal *pWal) { static int32_t walFetchHeadNew(SWalReader *pRead, int64_t fetchVer);
SWalReadHandle *pRead = taosMemoryMalloc(sizeof(SWalReadHandle)); static int32_t walFetchBodyNew(SWalReader *pRead);
static int32_t walSkipFetchBodyNew(SWalReader *pRead);
SWalReader *walOpenReader(SWal *pWal, SWalFilterCond *cond) {
SWalReader *pRead = taosMemoryMalloc(sizeof(SWalReader));
if (pRead == NULL) { if (pRead == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL; return NULL;
} }
pRead->pWal = pWal; pRead->pWal = pWal;
pRead->pReadIdxTFile = NULL; pRead->pIdxFile = NULL;
pRead->pReadLogTFile = NULL; pRead->pLogFile = NULL;
pRead->curVersion = -1; pRead->curVersion = -1;
pRead->curFileFirstVer = -1; pRead->curFileFirstVer = -1;
pRead->capacity = 0; pRead->capacity = 0;
pRead->status = 0; if (cond)
pRead->cond = *cond;
else {
pRead->cond.scanMeta = 0;
pRead->cond.scanUncommited = 0;
}
taosThreadMutexInit(&pRead->mutex, NULL); taosThreadMutexInit(&pRead->mutex, NULL);
@ -39,26 +48,46 @@ SWalReadHandle *walOpenReadHandle(SWal *pWal) {
taosMemoryFree(pRead); taosMemoryFree(pRead);
return NULL; return NULL;
} }
return pRead; return pRead;
} }
void walCloseReadHandle(SWalReadHandle *pRead) { void walCloseReader(SWalReader *pRead) {
taosCloseFile(&pRead->pReadIdxTFile); taosCloseFile(&pRead->pIdxFile);
taosCloseFile(&pRead->pReadLogTFile); taosCloseFile(&pRead->pLogFile);
taosMemoryFreeClear(pRead->pHead); taosMemoryFreeClear(pRead->pHead);
taosMemoryFree(pRead); taosMemoryFree(pRead);
} }
int32_t walRegisterRead(SWalReadHandle *pRead, int64_t ver) { int32_t walNextValidMsg(SWalReader *pRead) {
// TODO int64_t fetchVer = pRead->curVersion;
return 0; int64_t endVer = pRead->cond.scanUncommited ? walGetLastVer(pRead->pWal) : walGetCommittedVer(pRead->pWal);
while (fetchVer <= endVer) {
if (walFetchHeadNew(pRead, fetchVer) < 0) {
return -1;
}
if (pRead->pHead->head.msgType == TDMT_VND_SUBMIT ||
(IS_META_MSG(pRead->pHead->head.msgType) && pRead->cond.scanMeta)) {
if (walFetchBodyNew(pRead) < 0) {
return -1;
}
return 0;
} else {
if (walSkipFetchBodyNew(pRead) < 0) {
return -1;
}
fetchVer++;
ASSERT(fetchVer == pRead->curVersion);
}
}
return -1;
} }
static int64_t walReadSeekFilePos(SWalReadHandle *pRead, int64_t fileFirstVer, int64_t ver) { static int64_t walReadSeekFilePos(SWalReader *pRead, int64_t fileFirstVer, int64_t ver) {
int64_t ret = 0; int64_t ret = 0;
TdFilePtr pIdxTFile = pRead->pReadIdxTFile; TdFilePtr pIdxTFile = pRead->pIdxFile;
TdFilePtr pLogTFile = pRead->pReadLogTFile; TdFilePtr pLogTFile = pRead->pLogFile;
// seek position // seek position
int64_t offset = (ver - fileFirstVer) * sizeof(SWalIdxEntry); int64_t offset = (ver - fileFirstVer) * sizeof(SWalIdxEntry);
@ -90,11 +119,11 @@ static int64_t walReadSeekFilePos(SWalReadHandle *pRead, int64_t fileFirstVer, i
return ret; return ret;
} }
static int32_t walReadChangeFile(SWalReadHandle *pRead, int64_t fileFirstVer) { static int32_t walReadChangeFile(SWalReader *pRead, int64_t fileFirstVer) {
char fnameStr[WAL_FILE_LEN]; char fnameStr[WAL_FILE_LEN];
taosCloseFile(&pRead->pReadIdxTFile); taosCloseFile(&pRead->pIdxFile);
taosCloseFile(&pRead->pReadLogTFile); taosCloseFile(&pRead->pLogFile);
walBuildLogName(pRead->pWal, fileFirstVer, fnameStr); walBuildLogName(pRead->pWal, fileFirstVer, fnameStr);
TdFilePtr pLogTFile = taosOpenFile(fnameStr, TD_FILE_READ); TdFilePtr pLogTFile = taosOpenFile(fnameStr, TD_FILE_READ);
@ -104,7 +133,7 @@ static int32_t walReadChangeFile(SWalReadHandle *pRead, int64_t fileFirstVer) {
return -1; return -1;
} }
pRead->pReadLogTFile = pLogTFile; pRead->pLogFile = pLogTFile;
walBuildIdxName(pRead->pWal, fileFirstVer, fnameStr); walBuildIdxName(pRead->pWal, fileFirstVer, fnameStr);
TdFilePtr pIdxTFile = taosOpenFile(fnameStr, TD_FILE_READ); TdFilePtr pIdxTFile = taosOpenFile(fnameStr, TD_FILE_READ);
@ -114,11 +143,11 @@ static int32_t walReadChangeFile(SWalReadHandle *pRead, int64_t fileFirstVer) {
return -1; return -1;
} }
pRead->pReadIdxTFile = pIdxTFile; pRead->pIdxFile = pIdxTFile;
return 0; return 0;
} }
static int32_t walReadSeekVer(SWalReadHandle *pRead, int64_t ver) { static int32_t walReadSeekVer(SWalReader *pRead, int64_t ver) {
SWal *pWal = pRead->pWal; SWal *pWal = pRead->pWal;
if (ver == pRead->curVersion) { if (ver == pRead->curVersion) {
return 0; return 0;
@ -153,9 +182,94 @@ static int32_t walReadSeekVer(SWalReadHandle *pRead, int64_t ver) {
return 0; return 0;
} }
void walSetReaderCapacity(SWalReadHandle *pRead, int32_t capacity) { pRead->capacity = capacity; } void walSetReaderCapacity(SWalReader *pRead, int32_t capacity) { pRead->capacity = capacity; }
int32_t walFetchHead(SWalReadHandle *pRead, int64_t ver, SWalCkHead *pHead) { static int32_t walFetchHeadNew(SWalReader *pRead, int64_t fetchVer) {
int64_t contLen;
if (pRead->curVersion != fetchVer) {
if (walReadSeekVer(pRead, fetchVer) < 0) return -1;
}
contLen = taosReadFile(pRead->pLogFile, pRead->pHead, sizeof(SWalCkHead));
if (contLen != sizeof(SWalCkHead)) {
if (contLen < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
} else {
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
}
pRead->curVersion = -1;
return -1;
}
return 0;
}
static int32_t walFetchBodyNew(SWalReader *pRead) {
SWalCont *pReadHead = &pRead->pHead->head;
int64_t ver = pReadHead->version;
if (pRead->capacity < pReadHead->bodyLen) {
void *ptr = taosMemoryRealloc(pRead->pHead, sizeof(SWalCkHead) + pReadHead->bodyLen);
if (ptr == NULL) {
terrno = TSDB_CODE_WAL_OUT_OF_MEMORY;
return -1;
}
pRead->pHead = ptr;
pReadHead = &pRead->pHead->head;
pRead->capacity = pReadHead->bodyLen;
}
if (pReadHead->bodyLen != taosReadFile(pRead->pLogFile, pReadHead->body, pReadHead->bodyLen)) {
if (pReadHead->bodyLen < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
wError("wal fetch body error: %" PRId64 ", read request version:%" PRId64 ", since %s",
pRead->pHead->head.version, ver, tstrerror(terrno));
} else {
wError("wal fetch body error: %" PRId64 ", read request version:%" PRId64 ", since file corrupted",
pRead->pHead->head.version, ver);
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
}
pRead->curVersion = -1;
ASSERT(0);
return -1;
}
if (pReadHead->version != ver) {
wError("wal fetch body error: %" PRId64 ", read request version:%" PRId64 "", pRead->pHead->head.version, ver);
pRead->curVersion = -1;
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
ASSERT(0);
return -1;
}
if (walValidBodyCksum(pRead->pHead) != 0) {
wError("wal fetch body error: % " PRId64 ", since body checksum not passed", ver);
pRead->curVersion = -1;
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
ASSERT(0);
return -1;
}
pRead->curVersion = ver + 1;
return 0;
}
static int32_t walSkipFetchBodyNew(SWalReader *pRead) {
int64_t code;
ASSERT(pRead->curVersion == pRead->pHead->head.version);
code = taosLSeekFile(pRead->pLogFile, pRead->pHead->head.bodyLen, SEEK_CUR);
if (code < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
pRead->curVersion = -1;
return -1;
}
pRead->curVersion++;
return 0;
}
int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead) {
int64_t code; int64_t code;
// TODO: valid ver // TODO: valid ver
@ -168,9 +282,9 @@ int32_t walFetchHead(SWalReadHandle *pRead, int64_t ver, SWalCkHead *pHead) {
if (code < 0) return -1; if (code < 0) return -1;
} }
ASSERT(taosValidFile(pRead->pReadLogTFile) == true); ASSERT(taosValidFile(pRead->pLogFile) == true);
code = taosReadFile(pRead->pReadLogTFile, pHead, sizeof(SWalCkHead)); code = taosReadFile(pRead->pLogFile, pHead, sizeof(SWalCkHead));
if (code != sizeof(SWalCkHead)) { if (code != sizeof(SWalCkHead)) {
return -1; return -1;
} }
@ -186,12 +300,12 @@ int32_t walFetchHead(SWalReadHandle *pRead, int64_t ver, SWalCkHead *pHead) {
return 0; return 0;
} }
int32_t walSkipFetchBody(SWalReadHandle *pRead, const SWalCkHead *pHead) { int32_t walSkipFetchBody(SWalReader *pRead, const SWalCkHead *pHead) {
int64_t code; int64_t code;
ASSERT(pRead->curVersion == pHead->head.version); ASSERT(pRead->curVersion == pHead->head.version);
code = taosLSeekFile(pRead->pReadLogTFile, pHead->head.bodyLen, SEEK_CUR); code = taosLSeekFile(pRead->pLogFile, pHead->head.bodyLen, SEEK_CUR);
if (code < 0) { if (code < 0) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
pRead->curVersion = -1; pRead->curVersion = -1;
@ -203,7 +317,7 @@ int32_t walSkipFetchBody(SWalReadHandle *pRead, const SWalCkHead *pHead) {
return 0; return 0;
} }
int32_t walFetchBody(SWalReadHandle *pRead, SWalCkHead **ppHead) { int32_t walFetchBody(SWalReader *pRead, SWalCkHead **ppHead) {
SWalCont *pReadHead = &((*ppHead)->head); SWalCont *pReadHead = &((*ppHead)->head);
int64_t ver = pReadHead->version; int64_t ver = pReadHead->version;
@ -218,7 +332,7 @@ int32_t walFetchBody(SWalReadHandle *pRead, SWalCkHead **ppHead) {
pRead->capacity = pReadHead->bodyLen; pRead->capacity = pReadHead->bodyLen;
} }
if (pReadHead->bodyLen != taosReadFile(pRead->pReadLogTFile, pReadHead->body, pReadHead->bodyLen)) { if (pReadHead->bodyLen != taosReadFile(pRead->pLogFile, pReadHead->body, pReadHead->bodyLen)) {
ASSERT(0); ASSERT(0);
return -1; return -1;
} }
@ -241,9 +355,9 @@ int32_t walFetchBody(SWalReadHandle *pRead, SWalCkHead **ppHead) {
return 0; return 0;
} }
int32_t walReadWithHandle_s(SWalReadHandle *pRead, int64_t ver, SWalCont **ppHead) { int32_t walReadWithHandle_s(SWalReader *pRead, int64_t ver, SWalCont **ppHead) {
taosThreadMutexLock(&pRead->mutex); taosThreadMutexLock(&pRead->mutex);
if (walReadWithHandle(pRead, ver) < 0) { if (walReadVer(pRead, ver) < 0) {
taosThreadMutexUnlock(&pRead->mutex); taosThreadMutexUnlock(&pRead->mutex);
return -1; return -1;
} }
@ -257,7 +371,7 @@ int32_t walReadWithHandle_s(SWalReadHandle *pRead, int64_t ver, SWalCont **ppHea
return 0; return 0;
} }
int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver) { int32_t walReadVer(SWalReader *pRead, int64_t ver) {
int64_t code; int64_t code;
if (pRead->pWal->vers.firstVer == -1) { if (pRead->pWal->vers.firstVer == -1) {
@ -280,9 +394,9 @@ int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver) {
return -1; return -1;
} }
ASSERT(taosValidFile(pRead->pReadLogTFile) == true); ASSERT(taosValidFile(pRead->pLogFile) == true);
code = taosReadFile(pRead->pReadLogTFile, pRead->pHead, sizeof(SWalCkHead)); code = taosReadFile(pRead->pLogFile, pRead->pHead, sizeof(SWalCkHead));
if (code != sizeof(SWalCkHead)) { if (code != sizeof(SWalCkHead)) {
if (code < 0) if (code < 0)
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
@ -310,7 +424,7 @@ int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver) {
pRead->capacity = pRead->pHead->head.bodyLen; pRead->capacity = pRead->pHead->head.bodyLen;
} }
if ((code = taosReadFile(pRead->pReadLogTFile, pRead->pHead->head.body, pRead->pHead->head.bodyLen)) != if ((code = taosReadFile(pRead->pLogFile, pRead->pHead->head.body, pRead->pHead->head.bodyLen)) !=
pRead->pHead->head.bodyLen) { pRead->pHead->head.bodyLen) {
if (code < 0) if (code < 0)
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
@ -340,46 +454,3 @@ int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver) {
return 0; return 0;
} }
#if 0
int32_t walRead(SWal *pWal, SWalHead **ppHead, int64_t ver) {
int code;
code = walSeekVer(pWal, ver);
if (code != 0) {
return code;
}
if (*ppHead == NULL) {
void *ptr = taosMemoryRealloc(*ppHead, sizeof(SWalHead));
if (ptr == NULL) {
return -1;
}
*ppHead = ptr;
}
if (tfRead(pWal->pWriteLogTFile, *ppHead, sizeof(SWalHead)) != sizeof(SWalHead)) {
return -1;
}
// TODO: endian compatibility processing after read
if (walValidHeadCksum(*ppHead) != 0) {
return -1;
}
void *ptr = taosMemoryRealloc(*ppHead, sizeof(SWalHead) + (*ppHead)->head.len);
if (ptr == NULL) {
taosMemoryFree(*ppHead);
*ppHead = NULL;
return -1;
}
if (tfRead(pWal->pWriteLogTFile, (*ppHead)->head.body, (*ppHead)->head.len) != (*ppHead)->head.len) {
return -1;
}
// TODO: endian compatibility processing after read
if (walValidBodyCksum(*ppHead) != 0) {
return -1;
}
return 0;
}
int32_t walReadWithFp(SWal *pWal, FWalWrite writeFp, int64_t verStart, int32_t readNum) {
return 0;
}
#endif

View File

@ -79,19 +79,21 @@ int32_t walCommit(SWal *pWal, int64_t ver) {
} }
int32_t walRollback(SWal *pWal, int64_t ver) { int32_t walRollback(SWal *pWal, int64_t ver) {
taosThreadMutexLock(&pWal->mutex);
int64_t code; int64_t code;
char fnameStr[WAL_FILE_LEN]; char fnameStr[WAL_FILE_LEN];
if (ver > pWal->vers.lastVer || ver < pWal->vers.commitVer) { if (ver > pWal->vers.lastVer || ver < pWal->vers.commitVer) {
terrno = TSDB_CODE_WAL_INVALID_VER; terrno = TSDB_CODE_WAL_INVALID_VER;
taosThreadMutexUnlock(&pWal->mutex);
return -1; return -1;
} }
taosThreadMutexLock(&pWal->mutex);
// find correct file // find correct file
if (ver < walGetLastFileFirstVer(pWal)) { if (ver < walGetLastFileFirstVer(pWal)) {
// change current files // change current files
code = walChangeWrite(pWal, ver); code = walChangeWrite(pWal, ver);
if (code < 0) { if (code < 0) {
taosThreadMutexUnlock(&pWal->mutex);
return -1; return -1;
} }
@ -146,6 +148,7 @@ int32_t walRollback(SWal *pWal, int64_t ver) {
ASSERT(taosValidFile(pLogTFile)); ASSERT(taosValidFile(pLogTFile));
int64_t size = taosReadFile(pLogTFile, &head, sizeof(SWalCkHead)); int64_t size = taosReadFile(pLogTFile, &head, sizeof(SWalCkHead));
if (size != sizeof(SWalCkHead)) { if (size != sizeof(SWalCkHead)) {
taosThreadMutexUnlock(&pWal->mutex);
return -1; return -1;
} }
code = walValidHeadCksum(&head); code = walValidHeadCksum(&head);
@ -154,11 +157,13 @@ int32_t walRollback(SWal *pWal, int64_t ver) {
if (code != 0) { if (code != 0) {
terrno = TSDB_CODE_WAL_FILE_CORRUPTED; terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
ASSERT(0); ASSERT(0);
taosThreadMutexUnlock(&pWal->mutex);
return -1; return -1;
} }
if (head.head.version != ver) { if (head.head.version != ver) {
ASSERT(0); ASSERT(0);
terrno = TSDB_CODE_WAL_FILE_CORRUPTED; terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
taosThreadMutexUnlock(&pWal->mutex);
return -1; return -1;
} }
@ -167,12 +172,14 @@ int32_t walRollback(SWal *pWal, int64_t ver) {
if (code < 0) { if (code < 0) {
ASSERT(0); ASSERT(0);
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
taosThreadMutexUnlock(&pWal->mutex);
return -1; return -1;
} }
code = taosFtruncateFile(pIdxTFile, idxOff); code = taosFtruncateFile(pIdxTFile, idxOff);
if (code < 0) { if (code < 0) {
ASSERT(0); ASSERT(0);
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
taosThreadMutexUnlock(&pWal->mutex);
return -1; return -1;
} }
pWal->vers.lastVer = ver - 1; pWal->vers.lastVer = ver - 1;

View File

@ -292,8 +292,8 @@ TEST_F(WalCleanDeleteEnv, roll) {
TEST_F(WalKeepEnv, readHandleRead) { TEST_F(WalKeepEnv, readHandleRead) {
walResetEnv(); walResetEnv();
int code; int code;
SWalReadHandle* pRead = walOpenReadHandle(pWal); SWalReader* pRead = walOpenReader(pWal, NULL);
ASSERT(pRead != NULL); ASSERT(pRead != NULL);
int i; int i;
@ -306,7 +306,7 @@ TEST_F(WalKeepEnv, readHandleRead) {
} }
for (int i = 0; i < 1000; i++) { for (int i = 0; i < 1000; i++) {
int ver = taosRand() % 100; int ver = taosRand() % 100;
code = walReadWithHandle(pRead, ver); code = walReadVer(pRead, ver);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
// printf("rrbody: \n"); // printf("rrbody: \n");
@ -325,7 +325,7 @@ TEST_F(WalKeepEnv, readHandleRead) {
EXPECT_EQ(newStr[j], pRead->pHead->head.body[j]); EXPECT_EQ(newStr[j], pRead->pHead->head.body[j]);
} }
} }
walCloseReadHandle(pRead); walCloseReader(pRead);
} }
TEST_F(WalRetentionEnv, repairMeta1) { TEST_F(WalRetentionEnv, repairMeta1) {
@ -354,12 +354,12 @@ TEST_F(WalRetentionEnv, repairMeta1) {
ASSERT_EQ(pWal->vers.lastVer, 99); ASSERT_EQ(pWal->vers.lastVer, 99);
SWalReadHandle* pRead = walOpenReadHandle(pWal); SWalReader* pRead = walOpenReader(pWal, NULL);
ASSERT(pRead != NULL); ASSERT(pRead != NULL);
for (int i = 0; i < 1000; i++) { for (int i = 0; i < 1000; i++) {
int ver = taosRand() % 100; int ver = taosRand() % 100;
code = walReadWithHandle(pRead, ver); code = walReadVer(pRead, ver);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
// printf("rrbody: \n"); // printf("rrbody: \n");
@ -389,7 +389,7 @@ TEST_F(WalRetentionEnv, repairMeta1) {
for (int i = 0; i < 1000; i++) { for (int i = 0; i < 1000; i++) {
int ver = taosRand() % 200; int ver = taosRand() % 200;
code = walReadWithHandle(pRead, ver); code = walReadVer(pRead, ver);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
// printf("rrbody: \n"); // printf("rrbody: \n");

View File

@ -857,19 +857,27 @@ void taosGetSystemTimezone(char *outTimezoneStr, enum TdTimezone *tsTimezone) {
return; return;
} }
buf[n] = '\0'; buf[n] = '\0';
for (int i = n - 1; i >= 0; --i) {
if (buf[i] == '/') { char *zi = strstr(buf, "zoneinfo");
if (tz) { if (!zi) {
tz = buf + i + 1;
break;
}
tz = buf + i + 1;
}
}
if (!tz || 0 == strchr(tz, '/')) {
printf("parsing /etc/localtime failed"); printf("parsing /etc/localtime failed");
return; return;
} }
tz = zi + strlen("zoneinfo") + 1;
//for (int i = n - 1; i >= 0; --i) {
// if (buf[i] == '/') {
// if (tz) {
// tz = buf + i + 1;
// break;
// }
// tz = buf + i + 1;
// }
//}
//if (!tz || 0 == strchr(tz, '/')) {
// printf("parsing /etc/localtime failed");
// return;
//}
setenv("TZ", tz, 1); setenv("TZ", tz, 1);
tzset(); tzset();
@ -900,7 +908,7 @@ void taosGetSystemTimezone(char *outTimezoneStr, enum TdTimezone *tsTimezone) {
int n = readlink("/etc/localtime", buf, sizeof(buf)); int n = readlink("/etc/localtime", buf, sizeof(buf));
if (n < 0) { if (n < 0) {
printf("read /etc/localtime error, reason:%s", strerror(errno)); printf("read /etc/localtime error, reason:%s", strerror(errno));
if (taosCheckExistFile("/etc/timezone")) { if (taosCheckExistFile("/etc/timezone")) {
/* /*
* NOTE: do not remove it. * NOTE: do not remove it.
@ -962,19 +970,27 @@ void taosGetSystemTimezone(char *outTimezoneStr, enum TdTimezone *tsTimezone) {
return; return;
} }
buf[n] = '\0'; buf[n] = '\0';
for (int i = n - 1; i >= 0; --i) {
if (buf[i] == '/') { char *zi = strstr(buf, "zoneinfo");
if (tz) { if (!zi) {
tz = buf + i + 1;
break;
}
tz = buf + i + 1;
}
}
if (!tz || 0 == strchr(tz, '/')) {
printf("parsing /etc/localtime failed"); printf("parsing /etc/localtime failed");
return; return;
} }
tz = zi + strlen("zoneinfo") + 1;
//for (int i = n - 1; i >= 0; --i) {
// if (buf[i] == '/') {
// if (tz) {
// tz = buf + i + 1;
// break;
// }
// tz = buf + i + 1;
// }
//}
//if (!tz || 0 == strchr(tz, '/')) {
// printf("parsing /etc/localtime failed");
// return;
//}
setenv("TZ", tz, 1); setenv("TZ", tz, 1);
tzset(); tzset();

View File

@ -64,6 +64,20 @@ int32_t strdequote(char *z) {
return j + 1; // only one quote, do nothing return j + 1; // only one quote, do nothing
} }
char *strDupUnquo(const char *src) {
if (src == NULL) return NULL;
if (src[0] != '`') return strdup(src);
int32_t len = (int32_t)strlen(src);
if (src[len - 1] != '`') return NULL;
char *ret = taosMemoryMalloc(len);
if (ret == NULL) return NULL;
for (int32_t i = 0; i < len - 1; i++) {
ret[i] = src[i + 1];
}
ret[len - 1] = 0;
return ret;
}
size_t strtrim(char *z) { size_t strtrim(char *z) {
int32_t i = 0; int32_t i = 0;
int32_t j = 0; int32_t j = 0;

View File

@ -141,7 +141,7 @@ if $data01 != 7 then
goto loop1 goto loop1
endi endi
if $data02 != 9 then if $data02 != 18 then
print =====data02=$data02 print =====data02=$data02
goto loop1 goto loop1
endi endi
@ -151,22 +151,22 @@ if $data03 != 4 then
goto loop1 goto loop1
endi endi
if $data04 != 1.100000000 then if $data04 != 1.000000000 then
print ======$data04 print ======$data04
return -1 return -1
endi endi
if $data05 != 0.816496581 then if $data05 != 1.154700538 then
print ======$data05 print ======$data05
return -1 return -1
endi endi
if $data06 != 3 then if $data06 != 4 then
print ======$data06 print ======$data06
return -1 return -1
endi endi
if $data07 != 1.100000000 then if $data07 != 1.000000000 then
print ======$data07 print ======$data07
return -1 return -1
endi endi
@ -235,7 +235,7 @@ sql create stream streams4 trigger at_once watermark 1d into streamt4 as select
# sql create stream streams6 trigger at_once watermark 1d into streamt6 as select _wstartts, bottom(b,3), a,c from t1 session(ts,10s); # sql create stream streams6 trigger at_once watermark 1d into streamt6 as select _wstartts, bottom(b,3), a,c from t1 session(ts,10s);
# sql create stream streams7 trigger at_once watermark 1d into streamt7 as select _wstartts, spread(a), elapsed(ts), hyperloglog(a) from t1 session(ts,10s); # sql create stream streams7 trigger at_once watermark 1d into streamt7 as select _wstartts, spread(a), elapsed(ts), hyperloglog(a) from t1 session(ts,10s);
sql create stream streams7 trigger at_once watermark 1d into streamt7 as select _wstartts, spread(a), hyperloglog(a) from t1 session(ts,10s); sql create stream streams7 trigger at_once watermark 1d into streamt7 as select _wstartts, spread(a), hyperloglog(a) from t1 session(ts,10s);
sql create stream streams8 trigger at_once watermark 1d into streamt8 as select _wstartts, histogram(a,"user_input", "[1,3,5,7]", 1), histogram(a,"user_input", "[1,3,5,7]", 0) from t1 session(ts,10s); # sql create stream streams8 trigger at_once watermark 1d into streamt8 as select _wstartts, histogram(a,"user_input", "[1,3,5,7]", 1), histogram(a,"user_input", "[1,3,5,7]", 0) from t1 session(ts,10s);
sql insert into t1 values(1648791213001,1,1,1,1.0); sql insert into t1 values(1648791213001,1,1,1,1.0);
sql insert into t1 values(1648791213002,2,3,2,3.4); sql insert into t1 values(1648791213002,2,3,2,3.4);
sql insert into t1 values(1648791213003,4,9,3,4.8); sql insert into t1 values(1648791213003,4,9,3,4.8);
@ -288,10 +288,10 @@ if $rows == 0 then
goto loop3 goto loop3
endi endi
sql select * from streamt8; #sql select * from streamt8;
if $rows == 0 then #if $rows == 0 then
print ======$rows # print ======$rows
goto loop3 # goto loop3
endi #endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT system sh/exec.sh -n dnode1 -s stop -x SIGINT

View File

@ -1,6 +1,6 @@
system sh/stop_dnodes.sh system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1 system sh/deploy.sh -n dnode1 -i 1
system sh/cfg.sh -n dnode1 -c debugflag -v 131 #system sh/cfg.sh -n dnode1 -c debugflag -v 131
system sh/exec.sh -n dnode1 -s start -v system sh/exec.sh -n dnode1 -s start -v
sql connect sql connect
@ -19,43 +19,29 @@ if $rows != 1 then
return -1 return -1
endi endi
print =============== step2: create alter drop show user print =============== step2: create show database
sql create user u1 pass 'taosdata'
sql show users
sql alter user u1 sysinfo 1
sql alter user u1 enable 1
sql alter user u1 pass 'taosdata'
sql drop user u1
sql_error alter user u2 sysinfo 0
print =============== step3: create drop dnode
sql create dnode $hostname port 7200
sql drop dnode 2
sql alter dnode 1 'debugflag 143'
print =============== step4: create show database
sql create database d1 vgroups 1 buffer 3 sql create database d1 vgroups 1 buffer 3
sql show databases sql show databases
sql use d1 sql use d1
sql show vgroups sql show vgroups
print =============== step5: create show stable print =============== step3: create show stable
sql create table if not exists stb (ts timestamp, c1 int, c2 float, c3 double) tags (t1 int unsigned) sql create table if not exists stb (ts timestamp, c1 int, c2 float, c3 double) tags (t1 int unsigned)
sql show stables sql show stables
if $rows != 1 then if $rows != 1 then
return -1 return -1
endi endi
print =============== step6: create show table print =============== step4: create show table
sql create table ct1 using stb tags(1000) sql create table ct1 using stb tags(1000)
sql show tables sql show tables
if $rows != 1 then if $rows != 1 then
return -1 return -1
endi endi
print =============== step7: insert data print =============== step5: insert data
print =============== step7: select data print =============== step6: select data
_OVER: _OVER:
system sh/exec.sh -n dnode1 -s stop -x SIGINT system sh/exec.sh -n dnode1 -s stop -x SIGINT

View File

@ -19,10 +19,14 @@ if $rows != 1 then
return -1 return -1
endi endi
print =============== step2: create db print =============== step2
sql create database db vgroups 1 buffer 3 sql create database db vgroups 1 buffer 3
sql use db sql use db
sql create table if not exists stb (ts timestamp, c1 int, c2 float, c3 double) tags (t1 int unsigned) sql create table if not exists stb (ts timestamp, c1 int, c2 float, c3 double) tags (t1 int unsigned)
sql create table ct1 using stb tags(1000)
sql insert into ct1 values(now+0s, 10, 2.0, 3.0)
sql insert into ct1 values(now+1s, 11, 2.1, 3.1)(now+2s, -12, -2.2, -3.2)(now+3s, -13, -2.3, -3.3)
_OVER: _OVER:
system sh/exec.sh -n dnode1 -s stop -x SIGINT system sh/exec.sh -n dnode1 -s stop -x SIGINT

View File

@ -32,7 +32,7 @@ sql_error alter user u2 sysinfo 0
print =============== step3: create drop dnode print =============== step3: create drop dnode
sql create dnode $hostname port 7200 sql create dnode $hostname port 7200
sql drop dnode 2 sql drop dnode 2
sql alter dnode 1 'debugflag 143' sql alter dnode 1 'debugflag 131'
print =============== stop print =============== stop
system sh/exec.sh -n dnode1 -s stop -x SIGINT system sh/exec.sh -n dnode1 -s stop -x SIGINT

View File

@ -40,7 +40,7 @@ print ----> start to check if there are ERRORS in vagrind log file for each dnod
system_content sh/checkValgrind.sh -n dnode1 system_content sh/checkValgrind.sh -n dnode1
print cmd return result ----> [ $system_content ] print cmd return result ----> [ $system_content ]
if $system_content <= 6 then if $system_content <= 0 then
return 0 return 0
endi endi

View File

@ -25,7 +25,7 @@ class TDTestCase:
paraDict = {'dbName': 'db2', paraDict = {'dbName': 'db2',
'dropFlag': 1, 'dropFlag': 1,
'event': '', 'event': '',
'vgroups': 4, 'vgroups': 1,
'stbName': 'stb', 'stbName': 'stb',
'colPrefix': 'c', 'colPrefix': 'c',
'tagPrefix': 't', 'tagPrefix': 't',
@ -44,7 +44,7 @@ class TDTestCase:
topicNameList = ['topic1'] topicNameList = ['topic1']
expectRowsList = [] expectRowsList = []
tmqCom.initConsumerTable() tmqCom.initConsumerTable()
tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=4,replica=1) tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=1,replica=1)
tdLog.info("create stb") tdLog.info("create stb")
tdCom.create_stable(tdSql, dbname=paraDict["dbName"],stbname=paraDict["stbName"], column_elm_list=paraDict['colSchema'], tag_elm_list=paraDict['tagSchema']) tdCom.create_stable(tdSql, dbname=paraDict["dbName"],stbname=paraDict["stbName"], column_elm_list=paraDict['colSchema'], tag_elm_list=paraDict['tagSchema'])
tdLog.info("create ctb") tdLog.info("create ctb")

View File

@ -0,0 +1 @@
#python3 ./test.py -f 2-query/last.py -Q 3

View File

@ -295,7 +295,7 @@ python3 ./test.py -f 2-query/Today.py -Q 3
python3 ./test.py -f 2-query/max.py -Q 3 python3 ./test.py -f 2-query/max.py -Q 3
python3 ./test.py -f 2-query/min.py -Q 3 python3 ./test.py -f 2-query/min.py -Q 3
python3 ./test.py -f 2-query/count.py -Q 3 python3 ./test.py -f 2-query/count.py -Q 3
python3 ./test.py -f 2-query/last.py -Q 3 #python3 ./test.py -f 2-query/last.py -Q 3
python3 ./test.py -f 2-query/first.py -Q 3 python3 ./test.py -f 2-query/first.py -Q 3
python3 ./test.py -f 2-query/To_iso8601.py -Q 3 python3 ./test.py -f 2-query/To_iso8601.py -Q 3
python3 ./test.py -f 2-query/To_unixtimestamp.py -Q 3 python3 ./test.py -f 2-query/To_unixtimestamp.py -Q 3

@ -1 +1 @@
Subproject commit c885e967e490105999b84d009a15168728dfafaf Subproject commit 389047db713a3dddfbce292c3260b0864b17d936