Merge branch '3.0' of https://github.com/taosdata/TDengine into refact/submit_req

This commit is contained in:
Hongze Cheng 2022-11-30 13:28:17 +08:00
commit f848460f21
55 changed files with 1128 additions and 166 deletions

View File

@ -2,7 +2,7 @@
# taos-tools
ExternalProject_Add(taos-tools
GIT_REPOSITORY https://github.com/taosdata/taos-tools.git
GIT_TAG d19e82c
GIT_TAG cf30c86
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools"
BINARY_DIR ""
#BUILD_IN_SOURCE TRUE

View File

@ -307,6 +307,7 @@ DLL_EXPORT tmq_res_t tmq_get_res_type(TAOS_RES *res);
DLL_EXPORT int32_t tmq_get_raw(TAOS_RES *res, tmq_raw_data *raw);
DLL_EXPORT int32_t tmq_write_raw(TAOS *taos, tmq_raw_data raw);
DLL_EXPORT int taos_write_raw_block(TAOS *taos, int numOfRows, char *pData, const char *tbname);
DLL_EXPORT int taos_write_raw_block_with_fields(TAOS* taos, int rows, char* pData, const char* tbname, TAOS_FIELD *fields, int numFields);
DLL_EXPORT void tmq_free_raw(tmq_raw_data raw);
// Returning null means error. Returned result need to be freed by tmq_free_json_meta
DLL_EXPORT char *tmq_get_json_meta(TAOS_RES *res);

View File

@ -92,6 +92,10 @@ extern int32_t tsQueryNodeChunkSize;
extern bool tsQueryUseNodeAllocator;
extern bool tsKeepColumnName;
extern bool tsEnableQueryHb;
extern int32_t tsRedirectPeriod;
extern int32_t tsRedirectFactor;
extern int32_t tsRedirectMaxPeriod;
extern int32_t tsMaxRetryWaitTime;
// client
extern int32_t tsMinSlidingTime;

View File

@ -1710,6 +1710,8 @@ typedef struct {
int32_t execId;
} STaskDropReq;
int32_t tSerializeSTaskDropReq(void* buf, int32_t bufLen, STaskDropReq* pReq);
int32_t tDeserializeSTaskDropReq(void* buf, int32_t bufLen, STaskDropReq* pReq);
int32_t tSerializeSTaskDropReq(void* buf, int32_t bufLen, STaskDropReq* pReq);
int32_t tDeserializeSTaskDropReq(void* buf, int32_t bufLen, STaskDropReq* pReq);
@ -3137,7 +3139,8 @@ int32_t tDecodeDeleteRes(SDecoder* pCoder, SDeleteRes* pRes);
typedef struct {
// int64_t uid;
char tbname[TSDB_TABLE_NAME_LEN];
int64_t ts;
int64_t startTs;
int64_t endTs;
} SSingleDeleteReq;
int32_t tEncodeSSingleDeleteReq(SEncoder* pCoder, const SSingleDeleteReq* pReq);
@ -3197,6 +3200,10 @@ static FORCE_INLINE void tFreeSBatchRspMsg(void* p) {
taosMemoryFree(pRsp->msg);
}
int32_t tSerializeSMqAskEpReq(void* buf, int32_t bufLen, SMqAskEpReq* pReq);
int32_t tDeserializeSMqAskEpReq(void* buf, int32_t bufLen, SMqAskEpReq* pReq);
int32_t tSerializeSMqHbReq(void* buf, int32_t bufLen, SMqHbReq* pReq);
int32_t tDeserializeSMqHbReq(void* buf, int32_t bufLen, SMqHbReq* pReq);
int32_t tSerializeSMqAskEpReq(void* buf, int32_t bufLen, SMqAskEpReq* pReq);
int32_t tDeserializeSMqAskEpReq(void* buf, int32_t bufLen, SMqAskEpReq* pReq);
int32_t tSerializeSMqHbReq(void* buf, int32_t bufLen, SMqHbReq* pReq);

View File

@ -280,8 +280,8 @@ enum {
TD_NEW_MSG_SEG(TDMT_VND_STREAM_MSG)
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TRIGGER, "vnode-stream-trigger", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_RECOVER_STEP1, "vnode-stream-recover1", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_RECOVER_STEP2, "vnode-stream-recover2", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_RECOVER_NONBLOCKING_STAGE, "vnode-stream-recover1", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_RECOVER_BLOCKING_STAGE, "vnode-stream-recover2", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_MAX_MSG, "vnd-stream-max", NULL, NULL)
TD_NEW_MSG_SEG(TDMT_VND_TMQ_MSG)

View File

@ -57,7 +57,7 @@ typedef struct SFuncExecFuncs {
#define MAX_INTERVAL_TIME_WINDOW 10000000 // maximum allowed time windows in final results
#define TOP_BOTTOM_QUERY_LIMIT 100
#define FUNCTIONS_NAME_MAX_LENGTH 16
#define FUNCTIONS_NAME_MAX_LENGTH 32
typedef struct SResultRowEntryInfo {
bool initialized : 1; // output buffer has been initialized

View File

@ -259,9 +259,15 @@ extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t
#define NEED_CLIENT_HANDLE_ERROR(_code) \
(NEED_CLIENT_RM_TBLMETA_ERROR(_code) || NEED_CLIENT_REFRESH_VG_ERROR(_code) || \
NEED_CLIENT_REFRESH_TBLMETA_ERROR(_code))
#define SYNC_UNKNOWN_LEADER_REDIRECT_ERROR(_code) ((_code) == TSDB_CODE_SYN_NOT_LEADER || (_code) == TSDB_CODE_SYN_INTERNAL_ERROR)
#define SYNC_SELF_LEADER_REDIRECT_ERROR(_code) ((_code) == TSDB_CODE_SYN_NOT_LEADER || (_code) == TSDB_CODE_SYN_INTERNAL_ERROR)
#define SYNC_OTHER_LEADER_REDIRECT_ERROR(_code) (false) // used later
#define NEED_REDIRECT_ERROR(_code) \
((_code) == TSDB_CODE_RPC_REDIRECT || (_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL || \
(_code) == TSDB_CODE_NODE_NOT_DEPLOYED || (_code) == TSDB_CODE_SYN_NOT_LEADER || \
(_code) == TSDB_CODE_NODE_NOT_DEPLOYED || SYNC_UNKNOWN_LEADER_REDIRECT_ERROR(_code) || \
SYNC_SELF_LEADER_REDIRECT_ERROR(_code) || SYNC_OTHER_LEADER_REDIRECT_ERROR(_code) || \
(_code) == TSDB_CODE_APP_NOT_READY || (_code) == TSDB_CODE_RPC_BROKEN_LINK)
#define NEED_CLIENT_RM_TBLMETA_REQ(_type) \
@ -270,7 +276,8 @@ extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t
#define NEED_SCHEDULER_REDIRECT_ERROR(_code) \
((_code) == TSDB_CODE_RPC_REDIRECT || (_code) == TSDB_CODE_NODE_NOT_DEPLOYED || \
(_code) == TSDB_CODE_SYN_NOT_LEADER || (_code) == TSDB_CODE_APP_NOT_READY)
SYNC_UNKNOWN_LEADER_REDIRECT_ERROR(_code) || SYNC_SELF_LEADER_REDIRECT_ERROR(_code) || \
SYNC_OTHER_LEADER_REDIRECT_ERROR(_code) || (_code) == TSDB_CODE_APP_NOT_READY)
#define REQUEST_TOTAL_EXEC_TIMES 2

View File

@ -43,6 +43,9 @@ extern "C" {
#define SYNC_APPEND_ENTRIES_TIMEOUT_MS 10000
#define SYNC_HEART_TIMEOUT_MS 1000 * 8
#define SYNC_HEARTBEAT_SLOW_MS 1500
#define SYNC_HEARTBEAT_REPLY_SLOW_MS 1500
#define SYNC_MAX_BATCH_SIZE 1
#define SYNC_INDEX_BEGIN 0
#define SYNC_INDEX_INVALID -1
@ -226,6 +229,8 @@ int32_t syncEndSnapshot(int64_t rid);
int32_t syncLeaderTransfer(int64_t rid);
int32_t syncStepDown(int64_t rid, SyncTerm newTerm);
bool syncIsReadyForRead(int64_t rid);
bool syncSnapshotSending(int64_t rid);
bool syncSnapshotRecving(int64_t rid);
SSyncState syncGetState(int64_t rid);
void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet);

View File

@ -85,6 +85,11 @@ typedef struct SRpcInit {
int32_t retryLimit; // retry limit
int32_t retryInterval; // retry interval ms
int32_t retryMinInterval; // retry init interval
int32_t retryStepFactor; // retry interval factor
int32_t retryMaxInterval; // retry max interval
int64_t retryMaxTimouet;
int32_t compressSize; // -1: no compress, 0 : all data compressed, size: compress data if larger than size
int8_t encryption; // encrypt or not

View File

@ -92,6 +92,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_NO_AVAIL_DISK TAOS_DEF_ERROR_CODE(0, 0x0129)
#define TSDB_CODE_NOT_FOUND TAOS_DEF_ERROR_CODE(0, 0x012A)
#define TSDB_CODE_NO_DISKSPACE TAOS_DEF_ERROR_CODE(0, 0x012B)
#define TSDB_CODE_TIMEOUT_ERROR TAOS_DEF_ERROR_CODE(0, 0x012C)
//client
#define TSDB_CODE_TSC_INVALID_OPERATION TAOS_DEF_ERROR_CODE(0, 0x0200)
@ -415,6 +416,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_SYN_PROPOSE_NOT_READY TAOS_DEF_ERROR_CODE(0, 0x0911)
#define TSDB_CODE_SYN_STANDBY_NOT_READY TAOS_DEF_ERROR_CODE(0, 0x0912)
#define TSDB_CODE_SYN_BATCH_ERROR TAOS_DEF_ERROR_CODE(0, 0x0913)
#define TSDB_CODE_SYN_RESTORING TAOS_DEF_ERROR_CODE(0, 0x0914)
#define TSDB_CODE_SYN_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x09FF)
// tq

View File

@ -148,6 +148,10 @@ void *openTransporter(const char *user, const char *auth, int32_t numOfThread) {
rpcInit.dfp = destroyAhandle;
rpcInit.retryLimit = tsRpcRetryLimit;
rpcInit.retryInterval = tsRpcRetryInterval;
rpcInit.retryMinInterval = tsRedirectPeriod;
rpcInit.retryStepFactor = tsRedirectFactor;
rpcInit.retryMaxInterval = tsRedirectMaxPeriod;
rpcInit.retryMaxTimouet = tsMaxRetryWaitTime;
void *pDnodeConn = rpcOpen(&rpcInit);
if (pDnodeConn == NULL) {

View File

@ -1645,7 +1645,7 @@ int32_t getVersion1BlockMetaSize(const char* p, int32_t numOfCols) {
static int32_t estimateJsonLen(SReqResultInfo* pResultInfo, int32_t numOfCols, int32_t numOfRows) {
char* p = (char*)pResultInfo->pData;
// version + length + numOfRows + numOfCol + groupId + flag_segment + column_info
// | version | total length | total rows | total columns | flag seg| block group id | column schema | each column length |
int32_t len = getVersion1BlockMetaSize(p, numOfCols);
int32_t* colLength = (int32_t*)(p + len);
len += sizeof(int32_t) * numOfCols;

View File

@ -1211,6 +1211,208 @@ static void destroyVgHash(void* data) {
taosMemoryFreeClear(vgData->data);
}
int taos_write_raw_block_with_fields(TAOS* taos, int rows, char* pData, const char* tbname, TAOS_FIELD *fields, int numFields){
int32_t code = TSDB_CODE_SUCCESS;
STableMeta* pTableMeta = NULL;
SQuery* pQuery = NULL;
SSubmitReq* subReq = NULL;
SRequestObj* pRequest = (SRequestObj*)createRequest(*(int64_t*)taos, TSDB_SQL_INSERT, 0);
if (!pRequest) {
uError("WriteRaw:createRequest error request is null");
code = terrno;
goto end;
}
pRequest->syncQuery = true;
if (!pRequest->pDb) {
uError("WriteRaw:not use db");
code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
goto end;
}
SName pName = {TSDB_TABLE_NAME_T, pRequest->pTscObj->acctId, {0}, {0}};
tstrncpy(pName.dbname, pRequest->pDb, sizeof(pName.dbname));
tstrncpy(pName.tname, tbname, sizeof(pName.tname));
struct SCatalog* pCatalog = NULL;
code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
if (code != TSDB_CODE_SUCCESS) {
uError("WriteRaw: get gatlog error");
goto end;
}
SRequestConnInfo conn = {0};
conn.pTrans = pRequest->pTscObj->pAppInfo->pTransporter;
conn.requestId = pRequest->requestId;
conn.requestObjRefId = pRequest->self;
conn.mgmtEps = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp);
SVgroupInfo vgData = {0};
code = catalogGetTableHashVgroup(pCatalog, &conn, &pName, &vgData);
if (code != TSDB_CODE_SUCCESS) {
uError("WriteRaw:catalogGetTableHashVgroup failed. table name: %s", tbname);
goto end;
}
code = catalogGetTableMeta(pCatalog, &conn, &pName, &pTableMeta);
if (code != TSDB_CODE_SUCCESS) {
uError("WriteRaw:catalogGetTableMeta failed. table name: %s", tbname);
goto end;
}
uint64_t suid = (TSDB_NORMAL_TABLE == pTableMeta->tableType ? 0 : pTableMeta->suid);
uint64_t uid = pTableMeta->uid;
int32_t numOfCols = pTableMeta->tableInfo.numOfColumns;
uint16_t fLen = 0;
int32_t rowSize = 0;
int16_t nVar = 0;
for (int i = 0; i < pTableMeta->tableInfo.numOfColumns; i++) {
SSchema* schema = pTableMeta->schema + i;
fLen += TYPE_BYTES[schema->type];
rowSize += schema->bytes;
if (IS_VAR_DATA_TYPE(schema->type)) {
nVar++;
}
}
fLen -= sizeof(TSKEY);
int32_t extendedRowSize = rowSize + TD_ROW_HEAD_LEN - sizeof(TSKEY) + nVar * sizeof(VarDataOffsetT) +
(int32_t)TD_BITMAP_BYTES(numOfCols - 1);
int32_t schemaLen = 0;
int32_t submitLen = sizeof(SSubmitBlk) + schemaLen + rows * extendedRowSize;
int32_t totalLen = sizeof(SSubmitReq) + submitLen;
subReq = taosMemoryCalloc(1, totalLen);
SSubmitBlk* blk = POINTER_SHIFT(subReq, sizeof(SSubmitReq));
void* blkSchema = POINTER_SHIFT(blk, sizeof(SSubmitBlk));
STSRow* rowData = POINTER_SHIFT(blkSchema, schemaLen);
SRowBuilder rb = {0};
tdSRowInit(&rb, pTableMeta->sversion);
tdSRowSetTpInfo(&rb, numOfCols, fLen);
int32_t dataLen = 0;
// | version | total length | total rows | total columns | flag seg| block group id | column schema | each column length |
char* pStart = pData + getVersion1BlockMetaSize(pData, numFields);
int32_t* colLength = (int32_t*)pStart;
pStart += sizeof(int32_t) * numFields;
SResultColumn* pCol = taosMemoryCalloc(numFields, sizeof(SResultColumn));
for (int32_t i = 0; i < numFields; ++i) {
if (IS_VAR_DATA_TYPE(fields[i].type)) {
pCol[i].offset = (int32_t*)pStart;
pStart += rows * sizeof(int32_t);
} else {
pCol[i].nullbitmap = pStart;
pStart += BitmapLen(rows);
}
pCol[i].pData = pStart;
pStart += colLength[i];
}
SHashObj* schemaHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
for (int i = 0; i < numFields; i++) {
TAOS_FIELD* schema = &fields[i];
taosHashPut(schemaHash, schema->name, strlen(schema->name), &i, sizeof(int32_t));
}
for (int32_t j = 0; j < rows; j++) {
tdSRowResetBuf(&rb, rowData);
int32_t offset = 0;
for (int32_t k = 0; k < numOfCols; k++) {
const SSchema* pColumn = &pTableMeta->schema[k];
int32_t* index = taosHashGet(schemaHash, pColumn->name, strlen(pColumn->name));
if (!index) { // add none
tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NONE, NULL, false, offset, k);
}else{
if (IS_VAR_DATA_TYPE(pColumn->type)) {
if (pCol[*index].offset[j] != -1) {
char* data = pCol[*index].pData + pCol[*index].offset[j];
tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NORM, data, true, offset, k);
} else {
tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NULL, NULL, false, offset, k);
}
} else {
if (!colDataIsNull_f(pCol[*index].nullbitmap, j)) {
char* data = pCol[*index].pData + pColumn->bytes * j;
tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NORM, data, true, offset, k);
} else {
tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NULL, NULL, false, offset, k);
}
}
}
if (pColumn->colId != PRIMARYKEY_TIMESTAMP_COL_ID) {
offset += TYPE_BYTES[pColumn->type];
}
}
tdSRowEnd(&rb);
int32_t rowLen = TD_ROW_LEN(rowData);
rowData = POINTER_SHIFT(rowData, rowLen);
dataLen += rowLen;
}
taosHashCleanup(schemaHash);
taosMemoryFree(pCol);
blk->uid = htobe64(uid);
blk->suid = htobe64(suid);
blk->sversion = htonl(pTableMeta->sversion);
blk->schemaLen = htonl(schemaLen);
blk->numOfRows = htonl(rows);
blk->dataLen = htonl(dataLen);
subReq->length = sizeof(SSubmitReq) + sizeof(SSubmitBlk) + schemaLen + dataLen;
subReq->numOfBlocks = 1;
pQuery = (SQuery*)nodesMakeNode(QUERY_NODE_QUERY);
if (NULL == pQuery) {
uError("create SQuery error");
code = TSDB_CODE_OUT_OF_MEMORY;
goto end;
}
pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
pQuery->haveResultSet = false;
pQuery->msgType = TDMT_VND_SUBMIT;
pQuery->pRoot = (SNode*)nodesMakeNode(QUERY_NODE_VNODE_MODIF_STMT);
if (NULL == pQuery->pRoot) {
uError("create pQuery->pRoot error");
code = TSDB_CODE_OUT_OF_MEMORY;
goto end;
}
SVnodeModifOpStmt* nodeStmt = (SVnodeModifOpStmt*)(pQuery->pRoot);
nodeStmt->pDataBlocks = taosArrayInit(1, POINTER_BYTES);
SVgDataBlocks* dst = taosMemoryCalloc(1, sizeof(SVgDataBlocks));
if (NULL == dst) {
code = TSDB_CODE_TSC_OUT_OF_MEMORY;
goto end;
}
dst->vg = vgData;
dst->numOfTables = subReq->numOfBlocks;
dst->size = subReq->length;
dst->pData = (char*)subReq;
subReq->header.vgId = htonl(dst->vg.vgId);
subReq->version = htonl(1);
subReq->header.contLen = htonl(subReq->length);
subReq->length = htonl(subReq->length);
subReq->numOfBlocks = htonl(subReq->numOfBlocks);
subReq = NULL; // no need free
taosArrayPush(nodeStmt->pDataBlocks, &dst);
launchQueryImpl(pRequest, pQuery, true, NULL);
code = pRequest->code;
end:
taosMemoryFreeClear(pTableMeta);
qDestroyQuery(pQuery);
taosMemoryFree(subReq);
return code;
}
int taos_write_raw_block(TAOS* taos, int rows, char* pData, const char* tbname) {
int32_t code = TSDB_CODE_SUCCESS;
STableMeta* pTableMeta = NULL;
@ -1293,6 +1495,7 @@ int taos_write_raw_block(TAOS* taos, int rows, char* pData, const char* tbname)
tdSRowSetTpInfo(&rb, numOfCols, fLen);
int32_t dataLen = 0;
// | version | total length | total rows | total columns | flag seg| block group id | column schema | each column length |
char* pStart = pData + getVersion1BlockMetaSize(pData, numOfCols);
int32_t* colLength = (int32_t*)pStart;
pStart += sizeof(int32_t) * numOfCols;
@ -1577,7 +1780,7 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) {
const SSchema* pColumn = &pTableMeta->schema[k];
int32_t* index = taosHashGet(schemaHash, pColumn->name, strlen(pColumn->name));
if (!index) {
tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NULL, NULL, false, offset, k);
tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NONE, NULL, false, offset, k);
} else {
char* colData = rspObj.resInfo.row[*index];
if (!colData) {
@ -1670,6 +1873,7 @@ end:
return code;
}
static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen) {
int32_t code = TSDB_CODE_SUCCESS;
SHashObj* pVgHash = NULL;
@ -1882,7 +2086,7 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
const SSchema* pColumn = &pTableMeta->schema[k];
int32_t* index = taosHashGet(schemaHash, pColumn->name, strlen(pColumn->name));
if (!index) {
tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NULL, NULL, false, offset, k);
tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NONE, NULL, false, offset, k);
} else {
char* colData = rspObj.resInfo.row[*index];
if (!colData) {
@ -1963,7 +2167,7 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
launchQueryImpl(pRequest, pQuery, true, NULL);
code = pRequest->code;
end:
end:
tDeleteSTaosxRsp(&rspObj.rsp);
rspObj.resInfo.pRspMsg = NULL;
doFreeReqResultInfo(&rspObj.resInfo);

View File

@ -16,9 +16,9 @@
#define _DEFAULT_SOURCE
#include "tglobal.h"
#include "tconfig.h"
#include "tmisce.h"
#include "tgrant.h"
#include "tlog.h"
#include "tmisce.h"
GRANT_CFG_DECLARE;
@ -86,6 +86,10 @@ bool tsQueryPlannerTrace = false;
int32_t tsQueryNodeChunkSize = 32 * 1024;
bool tsQueryUseNodeAllocator = true;
bool tsKeepColumnName = false;
int32_t tsRedirectPeriod = 10;
int32_t tsRedirectFactor = 2;
int32_t tsRedirectMaxPeriod = 1000;
int32_t tsMaxRetryWaitTime = 10000;
/*
* denote if the server needs to compress response message at the application layer to client, including query rsp,
@ -120,7 +124,7 @@ int32_t tsMinIntervalTime = 1;
int32_t tsMaxMemUsedByInsert = 1024;
float tsSelectivityRatio = 1.0;
int32_t tsTagFilterResCacheSize = 1024*10;
int32_t tsTagFilterResCacheSize = 1024 * 10;
// the maximum allowed query buffer size during query processing for each data node.
// -1 no limit (default)
@ -305,6 +309,7 @@ static int32_t taosAddClientCfg(SConfig *pCfg) {
if (cfgAddInt32(pCfg, "maxMemUsedByInsert", tsMaxMemUsedByInsert, 1, INT32_MAX, true) != 0) return -1;
if (cfgAddInt32(pCfg, "rpcRetryLimit", tsRpcRetryLimit, 1, 100000, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "rpcRetryInterval", tsRpcRetryInterval, 1, 100000, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "maxRetryWaitTime", tsMaxRetryWaitTime, 0, 86400000, 0) != 0) return -1;
tsNumOfTaskQueueThreads = tsNumOfCores / 2;
tsNumOfTaskQueueThreads = TMAX(tsNumOfTaskQueueThreads, 4);
@ -659,6 +664,7 @@ static int32_t taosSetClientCfg(SConfig *pCfg) {
tsRpcRetryLimit = cfgGetItem(pCfg, "rpcRetryLimit")->i32;
tsRpcRetryInterval = cfgGetItem(pCfg, "rpcRetryInterval")->i32;
tsMaxRetryWaitTime = cfgGetItem(pCfg, "maxRetryWaitTime")->i32;
return 0;
}
@ -874,6 +880,8 @@ int32_t taosSetCfg(SConfig *pCfg, char *name) {
tsMaxNumOfDistinctResults = cfgGetItem(pCfg, "maxNumOfDistinctRes")->i32;
} else if (strcasecmp("maxMemUsedByInsert", name) == 0) {
tsMaxMemUsedByInsert = cfgGetItem(pCfg, "maxMemUsedByInsert")->i32;
} else if (strcasecmp("maxRetryWaitTime", name) == 0) {
tsMaxRetryWaitTime = cfgGetItem(pCfg, "maxRetryWaitTime")->i32;
}
break;
}

View File

@ -6616,13 +6616,15 @@ void tDeleteSTaosxRsp(STaosxRsp *pRsp) {
int32_t tEncodeSSingleDeleteReq(SEncoder *pEncoder, const SSingleDeleteReq *pReq) {
if (tEncodeCStr(pEncoder, pReq->tbname) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->ts) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->startTs) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->endTs) < 0) return -1;
return 0;
}
int32_t tDecodeSSingleDeleteReq(SDecoder *pDecoder, SSingleDeleteReq *pReq) {
if (tDecodeCStrTo(pDecoder, pReq->tbname) < 0) return -1;
if (tDecodeI64(pDecoder, &pReq->ts) < 0) return -1;
if (tDecodeI64(pDecoder, &pReq->startTs) < 0) return -1;
if (tDecodeI64(pDecoder, &pReq->endTs) < 0) return -1;
return 0;
}

View File

@ -258,8 +258,13 @@ int32_t dmInitClient(SDnode *pDnode) {
rpcInit.parent = pDnode;
rpcInit.rfp = rpcRfp;
rpcInit.compressSize = tsCompressMsgSize;
rpcInit.retryLimit = tsRpcRetryLimit;
rpcInit.retryInterval = tsRpcRetryInterval;
rpcInit.retryMinInterval = tsRedirectPeriod;
rpcInit.retryStepFactor = tsRedirectFactor;
rpcInit.retryMaxInterval = tsRedirectMaxPeriod;
rpcInit.retryMaxTimouet = tsMaxRetryWaitTime;
pTrans->clientRpc = rpcOpen(&rpcInit);
if (pTrans->clientRpc == NULL) {

View File

@ -1127,7 +1127,7 @@ int32_t tqProcessTaskRecover1Req(STQ* pTq, SRpcMsg* pMsg) {
SRpcMsg rpcMsg = {
.code = 0,
.contLen = len,
.msgType = TDMT_VND_STREAM_RECOVER_STEP2,
.msgType = TDMT_VND_STREAM_RECOVER_BLOCKING_STAGE,
.pCont = serializedReq,
};

View File

@ -21,14 +21,16 @@ int32_t tqBuildDeleteReq(SVnode* pVnode, const char* stbFullName, const SSDataBl
SBatchDeleteReq* deleteReq) {
ASSERT(pDataBlock->info.type == STREAM_DELETE_RESULT);
int32_t totRow = pDataBlock->info.rows;
SColumnInfoData* pTsCol = taosArrayGet(pDataBlock->pDataBlock, START_TS_COLUMN_INDEX);
SColumnInfoData* pStartTsCol = taosArrayGet(pDataBlock->pDataBlock, START_TS_COLUMN_INDEX);
SColumnInfoData* pEndTsCol = taosArrayGet(pDataBlock->pDataBlock, END_TS_COLUMN_INDEX);
SColumnInfoData* pGidCol = taosArrayGet(pDataBlock->pDataBlock, GROUPID_COLUMN_INDEX);
SColumnInfoData* pTbNameCol = taosArrayGet(pDataBlock->pDataBlock, TABLE_NAME_COLUMN_INDEX);
tqDebug("stream delete msg: row %d", totRow);
for (int32_t row = 0; row < totRow; row++) {
int64_t ts = *(int64_t*)colDataGetData(pTsCol, row);
int64_t startTs = *(int64_t*)colDataGetData(pStartTsCol, row);
int64_t endTs = *(int64_t*)colDataGetData(pEndTsCol, row);
int64_t groupId = *(int64_t*)colDataGetData(pGidCol, row);
char* name;
void* varTbName = NULL;
@ -42,8 +44,8 @@ int32_t tqBuildDeleteReq(SVnode* pVnode, const char* stbFullName, const SSDataBl
} else {
name = buildCtbNameByGroupId(stbFullName, groupId);
}
tqDebug("stream delete msg: vgId:%d, groupId :%" PRId64 ", name: %s, ts:%" PRId64, pVnode->config.vgId, groupId,
name, ts);
tqDebug("stream delete msg: vgId:%d, groupId :%" PRId64 ", name: %s, start ts:%" PRId64 "end ts:%" PRId64,
pVnode->config.vgId, groupId, name, startTs, endTs);
#if 0
SMetaReader mr = {0};
metaReaderInit(&mr, pVnode->pMeta, 0);
@ -59,7 +61,8 @@ int32_t tqBuildDeleteReq(SVnode* pVnode, const char* stbFullName, const SSDataBl
taosMemoryFree(name);
#endif
SSingleDeleteReq req = {
.ts = ts,
.startTs = startTs,
.endTs = endTs,
};
strncpy(req.tbname, name, TSDB_TABLE_NAME_LEN);
taosMemoryFree(name);

View File

@ -272,7 +272,7 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp
goto _err;
}
} break;
case TDMT_VND_STREAM_RECOVER_STEP2: {
case TDMT_VND_STREAM_RECOVER_BLOCKING_STAGE: {
if (tqProcessTaskRecover2Req(pVnode->pTq, version, pMsg->pCont, pMsg->contLen) < 0) {
goto _err;
}
@ -411,7 +411,7 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
return tqProcessTaskRetrieveReq(pVnode->pTq, pMsg);
case TDMT_STREAM_RETRIEVE_RSP:
return tqProcessTaskRetrieveRsp(pVnode->pTq, pMsg);
case TDMT_VND_STREAM_RECOVER_STEP1:
case TDMT_VND_STREAM_RECOVER_NONBLOCKING_STAGE:
return tqProcessTaskRecover1Req(pVnode->pTq, pMsg);
case TDMT_STREAM_RECOVER_FINISH:
return tqProcessTaskRecoverFinishReq(pVnode->pTq, pMsg);
@ -1298,11 +1298,11 @@ static int32_t vnodeProcessBatchDeleteReq(SVnode *pVnode, int64_t version, void
int64_t uid = mr.me.uid;
int32_t code = tsdbDeleteTableData(pVnode->pTsdb, version, deleteReq.suid, uid, pOneReq->ts, pOneReq->ts);
int32_t code = tsdbDeleteTableData(pVnode->pTsdb, version, deleteReq.suid, uid, pOneReq->startTs, pOneReq->endTs);
if (code < 0) {
terrno = code;
vError("vgId:%d, delete error since %s, suid:%" PRId64 ", uid:%" PRId64 ", start ts:%" PRId64 ", end ts:%" PRId64,
TD_VID(pVnode), terrstr(), deleteReq.suid, uid, pOneReq->ts, pOneReq->ts);
TD_VID(pVnode), terrstr(), deleteReq.suid, uid, pOneReq->startTs, pOneReq->endTs);
}
tDecoderClear(&mr.coder);

View File

@ -66,7 +66,7 @@ void vnodeRedirectRpcMsg(SVnode *pVnode, SRpcMsg *pMsg) {
}
pMsg->info.hasEpSet = 1;
SRpcMsg rsp = {.code = TSDB_CODE_RPC_REDIRECT, .info = pMsg->info, .msgType = pMsg->msgType + 1};
SRpcMsg rsp = {.code = TSDB_CODE_SYN_NOT_LEADER, .info = pMsg->info, .msgType = pMsg->msgType + 1};
tmsgSendRedirectRsp(&rsp, &newEpSet);
}

View File

@ -332,6 +332,7 @@ typedef struct STableScanInfo {
int32_t currentTable;
int8_t scanMode;
int8_t assignBlockUid;
bool hasGroupByTag;
} STableScanInfo;
typedef struct STableMergeScanInfo {

View File

@ -1419,6 +1419,78 @@ int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scan
}
}
static int32_t createDataBlockForEmptyInput(SOperatorInfo* pOperator, SSDataBlock **ppBlock) {
if (!tsCountAlwaysReturnValue) {
return TSDB_CODE_SUCCESS;
}
SOperatorInfo* downstream = pOperator->pDownstream[0];
if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_PARTITION ||
(downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN &&
((STableScanInfo *)downstream->info)->hasGroupByTag == true)) {
return TSDB_CODE_SUCCESS;
}
SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx;
bool hasCountFunc = false;
for (int32_t i = 0; i < pOperator->exprSupp.numOfExprs; ++i) {
if ((strcmp(pCtx[i].pExpr->pExpr->_function.functionName, "count") == 0) ||
(strcmp(pCtx[i].pExpr->pExpr->_function.functionName, "hyperloglog") == 0) ||
(strcmp(pCtx[i].pExpr->pExpr->_function.functionName, "_hyperloglog_partial") == 0) ||
(strcmp(pCtx[i].pExpr->pExpr->_function.functionName, "_hyperloglog_merge") == 0)) {
hasCountFunc = true;
break;
}
}
if (!hasCountFunc) {
return TSDB_CODE_SUCCESS;
}
SSDataBlock* pBlock = createDataBlock();
pBlock->info.rows = 1;
pBlock->info.capacity = 0;
for (int32_t i = 0; i < pOperator->exprSupp.numOfExprs; ++i) {
SColumnInfoData colInfo = {0};
colInfo.hasNull = true;
colInfo.info.type = TSDB_DATA_TYPE_NULL;
colInfo.info.bytes = 1;
SExprInfo* pOneExpr = &pOperator->exprSupp.pExprInfo[i];
for (int32_t j = 0; j < pOneExpr->base.numOfParams; ++j) {
SFunctParam* pFuncParam = &pOneExpr->base.pParam[j];
if (pFuncParam->type == FUNC_PARAM_TYPE_COLUMN) {
int32_t slotId = pFuncParam->pCol->slotId;
int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
if (slotId >= numOfCols) {
taosArrayEnsureCap(pBlock->pDataBlock, slotId + 1);
for (int32_t k = numOfCols; k < slotId + 1; ++k) {
taosArrayPush(pBlock->pDataBlock, &colInfo);
}
}
} else if (pFuncParam->type == FUNC_PARAM_TYPE_VALUE) {
// do nothing
}
}
}
blockDataEnsureCapacity(pBlock, pBlock->info.rows);
*ppBlock = pBlock;
return TSDB_CODE_SUCCESS;
}
static void destroyDataBlockForEmptyInput(bool blockAllocated, SSDataBlock **ppBlock) {
if (!blockAllocated) {
return;
}
blockDataDestroy(*ppBlock);
*ppBlock = NULL;
}
// this is a blocking operator
static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
if (OPTR_IS_OPENED(pOperator)) {
@ -1436,22 +1508,36 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
int32_t order = TSDB_ORDER_ASC;
int32_t scanFlag = MAIN_SCAN;
bool hasValidBlock = false;
bool blockAllocated = false;
while (1) {
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
if (pBlock == NULL) {
if (!hasValidBlock) {
createDataBlockForEmptyInput(pOperator, &pBlock);
if (pBlock == NULL) {
break;
}
blockAllocated = true;
} else {
break;
}
}
hasValidBlock = true;
int32_t code = getTableScanInfo(pOperator, &order, &scanFlag);
if (code != TSDB_CODE_SUCCESS) {
destroyDataBlockForEmptyInput(blockAllocated, &pBlock);
T_LONG_JMP(pTaskInfo->env, code);
}
// there is an scalar expression that needs to be calculated before apply the group aggregation.
if (pAggInfo->scalarExprSup.pExprInfo != NULL) {
if (pAggInfo->scalarExprSup.pExprInfo != NULL && !blockAllocated) {
SExprSupp* pSup1 = &pAggInfo->scalarExprSup;
code = projectApplyFunctions(pSup1->pExprInfo, pBlock, pBlock, pSup1->pCtx, pSup1->numOfExprs, NULL);
if (code != TSDB_CODE_SUCCESS) {
destroyDataBlockForEmptyInput(blockAllocated, &pBlock);
T_LONG_JMP(pTaskInfo->env, code);
}
}
@ -1461,8 +1547,12 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
setInputDataBlock(pSup, pBlock, order, scanFlag, true);
code = doAggregateImpl(pOperator, pSup->pCtx);
if (code != 0) {
destroyDataBlockForEmptyInput(blockAllocated, &pBlock);
T_LONG_JMP(pTaskInfo->env, code);
}
destroyDataBlockForEmptyInput(blockAllocated, &pBlock);
}
// the downstream operator may return with error code, so let's check the code before generating results.

View File

@ -895,6 +895,7 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode,
pInfo->currentGroupId = -1;
pInfo->assignBlockUid = pTableScanNode->assignBlockUid;
pInfo->hasGroupByTag = pTableScanNode->pGroupTags ? true : false;
setOperatorInfo(pOperator, "TableScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, false, OP_NOT_OPENED, pInfo,
pTaskInfo);

View File

@ -1313,8 +1313,8 @@ static void buildDeleteRange(SOperatorInfo* pOp, TSKEY start, TSKEY end, uint64_
char parTbName[VARSTR_HEADER_SIZE + TSDB_TABLE_NAME_LEN];
STR_WITH_MAXSIZE_TO_VARSTR(parTbName, tbname, sizeof(parTbName));
colDataAppend(pTableCol, pBlock->info.rows, (const char*)parTbName, false);
}
tdbFree(tbname);
}
pBlock->info.rows++;
}
@ -1427,7 +1427,7 @@ static void doDeleteFillResult(SOperatorInfo* pOperator) {
streamStateFreeCur(pCur);
pCur = streamStateGetAndCheckCur(pOperator->pTaskInfo->streamInfo.pState, &nextKey);
}
endTs = nextKey.ts - 1;
endTs = TMAX(ts, nextKey.ts - 1);
if (code != TSDB_CODE_SUCCESS) {
break;
}

View File

@ -3242,8 +3242,8 @@ void doBuildDeleteDataBlock(SOperatorInfo* pOp, SSHashObj* pStDeleted, SSDataBlo
char parTbName[VARSTR_HEADER_SIZE + TSDB_TABLE_NAME_LEN];
STR_WITH_MAXSIZE_TO_VARSTR(parTbName, tbname, sizeof(parTbName));
colDataAppend(pTableCol, pBlock->info.rows, (const char*)parTbName, false);
}
tdbFree(tbname);
}
pBlock->info.rows += 1;
}
if ((*Ite) == NULL) {

View File

@ -526,7 +526,7 @@ static int32_t getNumOfElems(SqlFunctionCtx* pCtx) {
* count function does not use the pCtx->interResBuf to keep the intermediate buffer
*/
int32_t countFunction(SqlFunctionCtx* pCtx) {
int32_t numOfElem = getNumOfElems(pCtx);
int32_t numOfElem = 0;
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
SInputColumnInfoData* pInput = &pCtx->input;
@ -539,6 +539,7 @@ int32_t countFunction(SqlFunctionCtx* pCtx) {
numOfElem = 1;
*((int64_t*)buf) = 0;
} else {
numOfElem = getNumOfElems(pCtx);
*((int64_t*)buf) += numOfElem;
}
@ -2043,12 +2044,16 @@ int32_t firstFunction(SqlFunctionCtx* pCtx) {
pInfo->bytes = pInputCol->info.bytes;
if (IS_NULL_TYPE(pInputCol->info.type)) {
return TSDB_CODE_SUCCESS;
}
// All null data column, return directly.
if (pInput->colDataSMAIsSet && (pInput->pColumnDataAgg[0]->numOfNull == pInput->totalRows)) {
ASSERT(pInputCol->hasNull == true);
// save selectivity value for column consisted of all null values
firstlastSaveTupleData(pCtx->pSrcBlock, pInput->startRowIndex, pCtx, pInfo);
return 0;
return TSDB_CODE_SUCCESS;
}
SColumnDataAgg* pColAgg = (pInput->colDataSMAIsSet) ? pInput->pColumnDataAgg[0] : NULL;
@ -2147,12 +2152,16 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) {
int32_t bytes = pInputCol->info.bytes;
pInfo->bytes = bytes;
if (IS_NULL_TYPE(type)) {
return TSDB_CODE_SUCCESS;
}
// All null data column, return directly.
if (pInput->colDataSMAIsSet && (pInput->pColumnDataAgg[0]->numOfNull == pInput->totalRows)) {
ASSERT(pInputCol->hasNull == true);
// save selectivity value for column consisted of all null values
firstlastSaveTupleData(pCtx->pSrcBlock, pInput->startRowIndex, pCtx, pInfo);
return 0;
return TSDB_CODE_SUCCESS;
}
SColumnDataAgg* pColAgg = (pInput->colDataSMAIsSet) ? pInput->pColumnDataAgg[0] : NULL;
@ -2417,9 +2426,14 @@ int32_t lastRowFunction(SqlFunctionCtx* pCtx) {
SInputColumnInfoData* pInput = &pCtx->input;
SColumnInfoData* pInputCol = pInput->pData[0];
int32_t type = pInputCol->info.type;
int32_t bytes = pInputCol->info.bytes;
pInfo->bytes = bytes;
if (IS_NULL_TYPE(type)) {
return TSDB_CODE_SUCCESS;
}
TSKEY startKey = getRowPTs(pInput->pPTS, 0);
TSKEY endKey = getRowPTs(pInput->pPTS, pInput->totalRows - 1);
@ -3869,6 +3883,10 @@ int32_t hllFunction(SqlFunctionCtx* pCtx) {
int32_t numOfRows = pInput->numOfRows;
int32_t numOfElems = 0;
if (IS_NULL_TYPE(type)) {
goto _hll_over;
}
for (int32_t i = start; i < numOfRows + start; ++i) {
if (pCol->hasNull && colDataIsNull_s(pCol, i)) {
continue;
@ -3890,6 +3908,7 @@ int32_t hllFunction(SqlFunctionCtx* pCtx) {
}
}
_hll_over:
pInfo->totalCount += numOfElems;
if (pInfo->totalCount == 0 && !tsCountAlwaysReturnValue) {
@ -3913,12 +3932,16 @@ static void hllTransferInfo(SHLLInfo* pInput, SHLLInfo* pOutput) {
int32_t hllFunctionMerge(SqlFunctionCtx* pCtx) {
SInputColumnInfoData* pInput = &pCtx->input;
SColumnInfoData* pCol = pInput->pData[0];
ASSERT(pCol->info.type == TSDB_DATA_TYPE_BINARY);
if (pCol->info.type != TSDB_DATA_TYPE_BINARY) {
return TSDB_CODE_SUCCESS;
}
SHLLInfo* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
int32_t start = pInput->startRowIndex;
for (int32_t i = start; i < start + pInput->numOfRows; ++i) {
char* data = colDataGetData(pCol, i);
SHLLInfo* pInputInfo = (SHLLInfo*)varDataVal(data);

View File

@ -471,7 +471,6 @@ int32_t avgFunction(SqlFunctionCtx* pCtx) {
int32_t type = pInput->pData[0]->info.type;
SAvgRes* pAvgRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
pAvgRes->type = type;
// computing based on the true data block
SColumnInfoData* pCol = pInput->pData[0];
@ -483,6 +482,8 @@ int32_t avgFunction(SqlFunctionCtx* pCtx) {
goto _over;
}
pAvgRes->type = type;
if (pInput->colDataSMAIsSet) { // try to use SMA if available
numOfElem = calculateAvgBySMAInfo(pAvgRes, numOfRows, type, pAgg);
} else if (!pCol->hasNull) { // try to employ the simd instructions to speed up the loop
@ -592,6 +593,10 @@ _over:
}
static void avgTransferInfo(SAvgRes* pInput, SAvgRes* pOutput) {
if (IS_NULL_TYPE(pInput->type)) {
return;
}
pOutput->type = pInput->type;
if (IS_SIGNED_NUMERIC_TYPE(pOutput->type)) {
pOutput->sum.isum += pInput->sum.isum;

View File

@ -27,6 +27,7 @@ extern "C" {
#include "tarray.h"
#include "thash.h"
#include "trpc.h"
#include "ttimer.h"
enum {
SCH_READ = 1,
@ -146,6 +147,7 @@ typedef struct SSchedulerMgmt {
int32_t jobRef;
int32_t jobNum;
SSchStat stat;
void *timer;
SRWLatch hbLock;
SHashObj *hbConnections;
void *queryMgmt;
@ -202,12 +204,30 @@ typedef struct SSchTaskProfile {
int64_t endTs;
} SSchTaskProfile;
typedef struct SSchRedirectCtx {
int32_t periodMs;
bool inRedirect;
int32_t totalTimes;
int32_t roundTotal;
int32_t roundTimes; // retry times in current round
int64_t startTs;
} SSchRedirectCtx;
typedef struct SSchTimerParam {
int64_t rId;
uint64_t queryId;
uint64_t taskId;
} SSchTimerParam;
typedef struct SSchTask {
uint64_t taskId; // task id
SRWLatch lock; // task reentrant lock
int32_t maxExecTimes; // task max exec times
int32_t maxRetryTimes; // task max retry times
int32_t retryTimes; // task retry times
int32_t delayExecMs; // task execution delay time
tmr_h delayTimer; // task delay execution timer
SSchRedirectCtx redirectCtx; // task redirect context
bool waitRetry; // wait for retry
int32_t execId; // task current execute index
SSchLevel *level; // level
@ -488,6 +508,7 @@ extern SSchedulerMgmt schMgmt;
void schDeregisterTaskHb(SSchJob *pJob, SSchTask *pTask);
void schCleanClusterHb(void *pTrans);
int32_t schLaunchTask(SSchJob *job, SSchTask *task);
int32_t schDelayLaunchTask(SSchJob *pJob, SSchTask *pTask);
int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, SQueryNodeAddr *addr, int32_t msgType);
SSchJob *schAcquireJob(int64_t refId);
int32_t schReleaseJob(int64_t refId);
@ -529,6 +550,7 @@ int32_t schJobFetchRows(SSchJob *pJob);
int32_t schJobFetchRowsA(SSchJob *pJob);
int32_t schUpdateTaskHandle(SSchJob *pJob, SSchTask *pTask, bool dropExecNode, void *handle, int32_t execId);
int32_t schProcessOnTaskStatusRsp(SQueryNodeEpId *pEpId, SArray *pStatusList);
char *schDumpEpSet(SEpSet *pEpSet);
char *schGetOpStr(SCH_OP_TYPE type);
int32_t schBeginOperation(SSchJob *pJob, SCH_OP_TYPE type, bool sync);
int32_t schInitJob(int64_t *pJobId, SSchedulerReq *pReq);

View File

@ -887,8 +887,13 @@ int32_t schAsyncSendMsg(SSchJob *pJob, SSchTask *pTask, SSchTrans *trans, SQuery
SCH_ERR_JRET(schGenerateCallBackInfo(pJob, pTask, msg, msgSize, msgType, trans, isHb, &pMsgSendInfo));
SCH_ERR_JRET(schUpdateSendTargetInfo(pMsgSendInfo, addr, pTask));
if (pJob && pTask) {
SCH_TASK_DLOG("start to send %s msg to node[%d,%s,%d], pTrans:%p, pHandle:%p", TMSG_INFO(msgType), addr->nodeId,
epSet->eps[epSet->inUse].fqdn, epSet->eps[epSet->inUse].port, trans->pTrans, trans->pHandle);
} else {
qDebug("start to send %s msg to node[%d,%s,%d], pTrans:%p, pHandle:%p", TMSG_INFO(msgType), addr->nodeId,
epSet->eps[epSet->inUse].fqdn, epSet->eps[epSet->inUse].port, trans->pTrans, trans->pHandle);
}
if (pTask) {
pTask->lastMsgType = msgType;

View File

@ -340,6 +340,70 @@ int32_t schRescheduleTask(SSchJob *pJob, SSchTask *pTask) {
return TSDB_CODE_SUCCESS;
}
int32_t schChkUpdateRedirectCtx(SSchJob *pJob, SSchTask *pTask, SEpSet *pEpSet) {
SSchRedirectCtx *pCtx = &pTask->redirectCtx;
if (!pCtx->inRedirect) {
pCtx->inRedirect = true;
pCtx->periodMs = tsRedirectPeriod;
pCtx->startTs = taosGetTimestampMs();
if (SCH_IS_DATA_BIND_TASK(pTask)) {
if (pEpSet) {
pCtx->roundTotal = pEpSet->numOfEps;
} else {
SQueryNodeAddr *pAddr = taosArrayGet(pTask->candidateAddrs, 0);
pCtx->roundTotal = pAddr->epSet.numOfEps;
}
} else {
pCtx->roundTotal = 1;
}
goto _return;
}
pCtx->totalTimes++;
if (SCH_IS_DATA_BIND_TASK(pTask) && pEpSet) {
pCtx->roundTotal = pEpSet->numOfEps;
pCtx->roundTimes = 0;
pTask->delayExecMs = 0;
goto _return;
}
pCtx->roundTimes++;
if (pCtx->roundTimes >= pCtx->roundTotal) {
int64_t nowTs = taosGetTimestampMs();
int64_t lastTime = nowTs - pCtx->startTs;
if (lastTime > tsMaxRetryWaitTime) {
SCH_TASK_DLOG("task no more redirect retry since timeout, now:%" PRId64 ", start:%" PRId64 ", max:%d, total:%d",
nowTs, pCtx->startTs, tsMaxRetryWaitTime, pCtx->totalTimes);
SCH_ERR_RET(TSDB_CODE_TIMEOUT_ERROR);
}
pCtx->periodMs *= tsRedirectFactor;
if (pCtx->periodMs > tsRedirectMaxPeriod) {
pCtx->periodMs = tsRedirectMaxPeriod;
}
int64_t leftTime = tsMaxRetryWaitTime - lastTime;
pTask->delayExecMs = leftTime < pCtx->periodMs ? leftTime : pCtx->periodMs;
goto _return;
}
pTask->delayExecMs = 0;
_return:
SCH_TASK_DLOG("task start %d/%d/%d redirect retry, delayExec:%d", pCtx->roundTimes, pCtx->roundTotal,
pCtx->totalTimes, pTask->delayExecMs);
return TSDB_CODE_SUCCESS;
}
int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32_t rspCode) {
int32_t code = 0;
@ -349,14 +413,10 @@ int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32
pTask->retryTimes = 0;
}
if (((pTask->execId + 1) >= pTask->maxExecTimes) || ((pTask->retryTimes + 1) > pTask->maxRetryTimes)) {
SCH_TASK_DLOG("task no more retry since reach max times %d:%d, execId %d", pTask->maxRetryTimes,
pTask->maxExecTimes, pTask->execId);
schHandleJobFailure(pJob, rspCode);
return TSDB_CODE_SUCCESS;
}
SCH_ERR_JRET(schChkUpdateRedirectCtx(pJob, pTask, pData ? pData->pEpSet : NULL));
pTask->waitRetry = true;
schDropTaskOnExecNode(pJob, pTask);
taosHashClear(pTask->execNodes);
schRemoveTaskFromExecList(pJob, pTask);
@ -368,8 +428,17 @@ int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32
memset(&pTask->succeedAddr, 0, sizeof(pTask->succeedAddr));
if (SCH_IS_DATA_BIND_TASK(pTask)) {
if (pData) {
if (pData && pData->pEpSet) {
SCH_ERR_JRET(schUpdateTaskCandidateAddr(pJob, pTask, pData->pEpSet));
} else if (SYNC_UNKNOWN_LEADER_REDIRECT_ERROR(rspCode)) {
SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
SCH_SWITCH_EPSET(addr);
SCH_TASK_DLOG("switch task target node %d epset to %d/%d", addr->nodeId, addr->epSet.inUse, addr->epSet.numOfEps);
} else {
SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
SEp *pEp = &addr->epSet.eps[addr->epSet.inUse];
SCH_TASK_DLOG("task retry node %d current ep, idx:%d/%d,%s:%d", addr->nodeId, addr->epSet.inUse,
addr->epSet.numOfEps, pEp->fqdn, pEp->port);
}
if (SCH_TASK_NEED_FLOW_CTRL(pJob, pTask)) {
@ -380,7 +449,7 @@ int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32
SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_INIT);
SCH_ERR_JRET(schLaunchTask(pJob, pTask));
SCH_ERR_JRET(schDelayLaunchTask(pJob, pTask));
return TSDB_CODE_SUCCESS;
}
@ -428,28 +497,24 @@ int32_t schHandleRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32
schUpdateJobStatus(pJob, JOB_TASK_STATUS_EXEC);
}
if (SCH_IS_DATA_BIND_TASK(pTask)) {
if (SYNC_OTHER_LEADER_REDIRECT_ERROR(rspCode)) {
if (NULL == pData->pEpSet) {
SCH_TASK_ELOG("no epset updated while got error %s", tstrerror(rspCode));
code = rspCode;
SCH_TASK_ELOG("epset updating excepted, error:%s", tstrerror(rspCode));
code = TSDB_CODE_INVALID_MSG;
goto _return;
}
}
code = schDoTaskRedirect(pJob, pTask, pData, rspCode);
taosMemoryFree(pData->pData);
taosMemoryFree(pData->pEpSet);
pData->pData = NULL;
pData->pEpSet = NULL;
taosMemoryFreeClear(pData->pData);
taosMemoryFreeClear(pData->pEpSet);
SCH_RET(code);
_return:
taosMemoryFree(pData->pData);
taosMemoryFree(pData->pEpSet);
pData->pData = NULL;
pData->pEpSet = NULL;
taosMemoryFreeClear(pData->pData);
taosMemoryFreeClear(pData->pEpSet);
SCH_RET(schProcessOnTaskFailure(pJob, pTask, code));
}
@ -715,10 +780,13 @@ int32_t schUpdateTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask, SEpSet *pEpSe
SQueryNodeAddr *pAddr = taosArrayGet(pTask->candidateAddrs, 0);
SEp *pOld = &pAddr->epSet.eps[pAddr->epSet.inUse];
SEp *pNew = &pEpSet->eps[pEpSet->inUse];
char *origEpset = schDumpEpSet(&pAddr->epSet);
char *newEpset = schDumpEpSet(pEpSet);
SCH_TASK_DLOG("update task ep from %s:%d to %s:%d", pOld->fqdn, pOld->port, pNew->fqdn, pNew->port);
SCH_TASK_DLOG("update task target node %d epset from %s to %s", pAddr->nodeId, origEpset, newEpset);
taosMemoryFree(origEpset);
taosMemoryFree(newEpset);
memcpy(&pAddr->epSet, pEpSet, sizeof(pAddr->epSet));
@ -1078,6 +1146,56 @@ _return:
SCH_RET(schProcessOnTaskFailure(pJob, pTask, code));
}
void schHandleTimerEvent(void *param, void *tmrId) {
SSchTimerParam *pTimerParam = (SSchTimerParam *)param;
SSchTask *pTask = NULL;
SSchJob *pJob = NULL;
int32_t code = 0;
int64_t rId = pTimerParam->rId;
uint64_t queryId = pTimerParam->queryId;
uint64_t taskId = pTimerParam->taskId;
taosMemoryFree(pTimerParam);
if (schProcessOnCbBegin(&pJob, &pTask, queryId, rId, taskId)) {
return;
}
code = schLaunchTask(pJob, pTask);
schProcessOnCbEnd(pJob, pTask, code);
}
int32_t schDelayLaunchTask(SSchJob *pJob, SSchTask *pTask) {
if (pTask->delayExecMs > 0) {
SSchTimerParam *param = taosMemoryMalloc(sizeof(SSchTimerParam));
if (NULL == param) {
SCH_TASK_ELOG("taosMemoryMalloc %d failed", (int)sizeof(SSchTimerParam));
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
param->rId = pJob->refId;
param->queryId = pJob->queryId;
param->taskId = pTask->taskId;
if (NULL == pTask->delayTimer) {
pTask->delayTimer = taosTmrStart(schHandleTimerEvent, pTask->delayExecMs, (void *)param, schMgmt.timer);
if (NULL == pTask->delayTimer) {
SCH_TASK_ELOG("start delay timer failed, handle:%p", schMgmt.timer);
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
return TSDB_CODE_SUCCESS;
}
taosTmrReset(schHandleTimerEvent, pTask->delayExecMs, (void *)param, schMgmt.timer, &pTask->delayTimer);
return TSDB_CODE_SUCCESS;
}
SCH_RET(schLaunchTask(pJob, pTask));
}
int32_t schLaunchLevelTasks(SSchJob *pJob, SSchLevel *level) {
SCH_ERR_RET(schChkJobNeedFlowCtrl(pJob, level));
@ -1099,7 +1217,12 @@ void schDropTaskInHashList(SSchJob *pJob, SHashObj *list) {
while (pIter) {
SSchTask *pTask = *(SSchTask **)pIter;
SCH_LOCK_TASK(pTask);
if (pTask->delayTimer) {
taosTmrStopA(&pTask->delayTimer);
}
schDropTaskOnExecNode(pJob, pTask);
SCH_UNLOCK_TASK(pTask);
pIter = taosHashIterate(list, pIter);
}

View File

@ -36,6 +36,27 @@ FORCE_INLINE int32_t schReleaseJob(int64_t refId) {
return taosReleaseRef(schMgmt.jobRef, refId);
}
char *schDumpEpSet(SEpSet *pEpSet) {
if (NULL == pEpSet) {
return NULL;
}
int32_t maxSize = 1024;
char *str = taosMemoryMalloc(maxSize);
if (NULL == str) {
return NULL;
}
int32_t n = 0;
n += snprintf(str + n, maxSize - n, "numOfEps:%d, inUse:%d eps:", pEpSet->numOfEps, pEpSet->inUse);
for (int32_t i = 0; i < pEpSet->numOfEps; ++i) {
SEp *pEp = &pEpSet->eps[i];
n += snprintf(str + n, maxSize - n, "[%s:%d]", pEp->fqdn, pEp->port);
}
return str;
}
char *schGetOpStr(SCH_OP_TYPE type) {
switch (type) {
case SCH_OP_NULL:

View File

@ -48,6 +48,12 @@ int32_t schedulerInit() {
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
schMgmt.timer = taosTmrInit(0, 0, 0, "scheduler");
if (NULL == schMgmt.timer) {
qError("init timer failed, error:%s", tstrerror(terrno));
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
if (taosGetSystemUUID((char *)&schMgmt.sId, sizeof(schMgmt.sId))) {
qError("generate schdulerId failed, errno:%d", errno);
SCH_ERR_RET(TSDB_CODE_QRY_SYS_ERROR);

View File

@ -16,6 +16,7 @@
#include "streamInc.h"
static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray* pRes) {
int32_t code;
void* exec = pTask->exec.executor;
// set input
@ -49,8 +50,10 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray*
while (1) {
SSDataBlock* output = NULL;
uint64_t ts = 0;
if (qExecTask(exec, &output, &ts) < 0) {
ASSERT(false);
if ((code = qExecTask(exec, &output, &ts)) < 0) {
/*ASSERT(false);*/
qError("unexpected stream execution, stream %" PRId64 " task: %d, since %s", pTask->streamId, pTask->taskId,
terrstr());
}
if (output == NULL) {
if (pItem->type == STREAM_INPUT__DATA_RETRIEVE) {

View File

@ -36,7 +36,7 @@ int32_t streamTaskLaunchRecover(SStreamTask* pTask, int64_t version) {
SRpcMsg rpcMsg = {
.contLen = len,
.pCont = serializedReq,
.msgType = TDMT_VND_STREAM_RECOVER_STEP1,
.msgType = TDMT_VND_STREAM_RECOVER_NONBLOCKING_STAGE,
};
if (tmsgPutToQueue(pTask->pMsgCb, STREAM_QUEUE, &rpcMsg) < 0) {

View File

@ -195,6 +195,8 @@ typedef struct SSyncNode {
int32_t electNum;
int32_t becomeLeaderNum;
int32_t configChangeNum;
int32_t hbSlowNum;
int32_t hbrSlowNum;
bool isStart;
@ -239,6 +241,8 @@ int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode,
SyncIndex syncMinMatchIndex(SSyncNode* pSyncNode);
int32_t syncCacheEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry, LRUHandle** h);
bool syncNodeHeartbeatReplyTimeout(SSyncNode* pSyncNode);
bool syncNodeSnapshotSending(SSyncNode* pSyncNode);
bool syncNodeSnapshotRecving(SSyncNode* pSyncNode);
// raft state change --------------
void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term);

View File

@ -447,6 +447,28 @@ bool syncIsReadyForRead(int64_t rid) {
return ready;
}
bool syncSnapshotSending(int64_t rid) {
SSyncNode* pSyncNode = syncNodeAcquire(rid);
if (pSyncNode == NULL) {
return false;
}
bool b = syncNodeSnapshotSending(pSyncNode);
syncNodeRelease(pSyncNode);
return b;
}
bool syncSnapshotRecving(int64_t rid) {
SSyncNode* pSyncNode = syncNodeAcquire(rid);
if (pSyncNode == NULL) {
return false;
}
bool b = syncNodeSnapshotRecving(pSyncNode);
syncNodeRelease(pSyncNode);
return b;
}
int32_t syncNodeLeaderTransfer(SSyncNode* pSyncNode) {
if (pSyncNode->peersNum == 0) {
sDebug("vgId:%d, only one replica, cannot leader transfer", pSyncNode->vgId);
@ -1013,6 +1035,8 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) {
pSyncNode->electNum = 0;
pSyncNode->becomeLeaderNum = 0;
pSyncNode->configChangeNum = 0;
pSyncNode->hbSlowNum = 0;
pSyncNode->hbrSlowNum = 0;
sNTrace(pSyncNode, "sync open, node:%p", pSyncNode);
@ -1563,6 +1587,8 @@ void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) {
pSyncNode->leaderCache = EMPTY_RAFT_ID;
}
pSyncNode->hbSlowNum = 0;
// state change
pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
syncNodeStopHeartbeatTimer(pSyncNode);
@ -1607,6 +1633,7 @@ void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) {
pSyncNode->leaderTime = taosGetTimestampMs();
pSyncNode->becomeLeaderNum++;
pSyncNode->hbrSlowNum = 0;
// reset restoreFinish
pSyncNode->restoreFinish = false;
@ -2167,6 +2194,25 @@ bool syncNodeHeartbeatReplyTimeout(SSyncNode* pSyncNode) {
return b;
}
bool syncNodeSnapshotSending(SSyncNode* pSyncNode) {
if (pSyncNode == NULL) return false;
bool b = false;
for (int32_t i = 0; i < pSyncNode->replicaNum; ++i) {
if (pSyncNode->senders[i] != NULL && pSyncNode->senders[i]->start) {
b = true;
break;
}
}
return b;
}
bool syncNodeSnapshotRecving(SSyncNode* pSyncNode) {
if (pSyncNode == NULL) return false;
if (pSyncNode->pNewNodeReceiver == NULL) return false;
if (pSyncNode->pNewNodeReceiver->start) return true;
return false;
}
static int32_t syncNodeAppendNoop(SSyncNode* ths) {
int32_t ret = 0;

View File

@ -283,13 +283,15 @@ void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, SSyncNo
"vgId:%d, sync %s "
"%s"
", tm:%" PRIu64 ", cmt:%" PRId64 ", fst:%" PRId64 ", lst:%" PRId64 ", min:%" PRId64 ", snap:%" PRId64
", snap-tm:%" PRIu64 ", elt-num:%d, bl-num:%d, cc-num:%d, hit:%d, mis:%d, aq:%d, snaping:%" PRId64
", snap-tm:%" PRIu64
", elt-num:%d, bl-num:%d, cc-num:%d, hit:%d, mis:%d, hb-slow:%d, hbr-slow:%d, aq:%d, snaping:%" PRId64
", r-num:%d, lcfg:%" PRId64 ", chging:%d, rsto:%d, dquorum:%d, elt:%" PRId64 ", hb:%" PRId64 ", %s, %s, %s, %s",
pNode->vgId, syncStr(pNode->state), eventLog, currentTerm, pNode->commitIndex, logBeginIndex, logLastIndex,
pNode->minMatchIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm, pNode->electNum, pNode->becomeLeaderNum,
pNode->configChangeNum, cacheHit, cacheMiss, aqItems, pNode->snapshottingIndex, pNode->replicaNum,
pNode->pRaftCfg->lastConfigIndex, pNode->changing, pNode->restoreFinish, quorum, pNode->electTimerLogicClock,
pNode->heartbeatTimerLogicClockUser, peerStr, cfgStr, hbTimeStr, hbrTimeStr);
pNode->configChangeNum, cacheHit, cacheMiss, pNode->hbSlowNum, pNode->hbrSlowNum, aqItems,
pNode->snapshottingIndex, pNode->replicaNum, pNode->pRaftCfg->lastConfigIndex, pNode->changing,
pNode->restoreFinish, quorum, pNode->electTimerLogicClock, pNode->heartbeatTimerLogicClockUser, peerStr, cfgStr,
hbTimeStr, hbrTimeStr);
}
}
@ -463,12 +465,23 @@ void syncLogSendHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, bool
}
void syncLogRecvHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, int64_t timeDiff) {
if (timeDiff > SYNC_HEARTBEAT_SLOW_MS) {
pSyncNode->hbSlowNum++;
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
sNInfo(pSyncNode,
"recv sync-heartbeat from %s:%d slow {term:%" PRId64 ", cmt:%" PRId64 ", min-match:%" PRId64 ", ts:%" PRId64
"}, net elapsed:%" PRId64,
host, port, pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pMsg->timeStamp, timeDiff);
}
if (!(sDebugFlag & DEBUG_TRACE)) return;
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
sNTrace(pSyncNode,
"recv sync-heartbeat from %s:%d {term:%" PRId64 ", cmt:%" PRId64 ", min-match:%" PRId64 ", ts:%" PRId64
"}, net elapsed:%" PRId64,
@ -487,6 +500,17 @@ void syncLogSendHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* p
}
void syncLogRecvHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, int64_t timeDiff) {
if (timeDiff > SYNC_HEARTBEAT_REPLY_SLOW_MS) {
pSyncNode->hbrSlowNum++;
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
sNTrace(pSyncNode,
"recv sync-heartbeat-reply from %s:%d slow {term:%" PRId64 ", ts:%" PRId64 "}, net elapsed:%" PRId64, host,
port, pMsg->term, pMsg->timeStamp, timeDiff);
}
if (!(sDebugFlag & DEBUG_TRACE)) return;
char host[64];
@ -615,7 +639,7 @@ void syncLogSendAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMs
}
void syncLogRecvRequestVote(SSyncNode* pSyncNode, const SyncRequestVote* pMsg, int32_t voteGranted, const char* s) {
if (!(sDebugFlag & DEBUG_TRACE)) return;
// if (!(sDebugFlag & DEBUG_TRACE)) return;
char logBuf[256];
char host[64];
@ -623,42 +647,42 @@ void syncLogRecvRequestVote(SSyncNode* pSyncNode, const SyncRequestVote* pMsg, i
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
if (voteGranted == -1) {
sNTrace(pSyncNode,
sNInfo(pSyncNode,
"recv sync-request-vote from %s:%d, {term:%" PRId64 ", lindex:%" PRId64 ", lterm:%" PRId64 "}, %s", host,
port, pMsg->term, pMsg->lastLogIndex, pMsg->lastLogTerm, s);
} else {
sNTrace(pSyncNode,
sNInfo(pSyncNode,
"recv sync-request-vote from %s:%d, {term:%" PRId64 ", lindex:%" PRId64 ", lterm:%" PRId64 "}, granted:%d",
host, port, pMsg->term, pMsg->lastLogIndex, pMsg->lastLogTerm, voteGranted);
}
}
void syncLogSendRequestVote(SSyncNode* pNode, const SyncRequestVote* pMsg, const char* s) {
if (!(sDebugFlag & DEBUG_TRACE)) return;
// if (!(sDebugFlag & DEBUG_TRACE)) return;
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
sNTrace(pNode, "send sync-request-vote to %s:%d {term:%" PRId64 ", lindex:%" PRId64 ", lterm:%" PRId64 "}, %s", host,
sNInfo(pNode, "send sync-request-vote to %s:%d {term:%" PRId64 ", lindex:%" PRId64 ", lterm:%" PRId64 "}, %s", host,
port, pMsg->term, pMsg->lastLogIndex, pMsg->lastLogTerm, s);
}
void syncLogRecvRequestVoteReply(SSyncNode* pSyncNode, const SyncRequestVoteReply* pMsg, const char* s) {
if (!(sDebugFlag & DEBUG_TRACE)) return;
// if (!(sDebugFlag & DEBUG_TRACE)) return;
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
sNTrace(pSyncNode, "recv sync-request-vote-reply from %s:%d {term:%" PRId64 ", grant:%d}, %s", host, port, pMsg->term,
sNInfo(pSyncNode, "recv sync-request-vote-reply from %s:%d {term:%" PRId64 ", grant:%d}, %s", host, port, pMsg->term,
pMsg->voteGranted, s);
}
void syncLogSendRequestVoteReply(SSyncNode* pSyncNode, const SyncRequestVoteReply* pMsg, const char* s) {
if (!(sDebugFlag & DEBUG_TRACE)) return;
// if (!(sDebugFlag & DEBUG_TRACE)) return;
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
sNTrace(pSyncNode, "send sync-request-vote-reply to %s:%d {term:%" PRId64 ", grant:%d}, %s", host, port, pMsg->term,
sNInfo(pSyncNode, "send sync-request-vote-reply to %s:%d {term:%" PRId64 ", grant:%d}, %s", host, port, pMsg->term,
pMsg->voteGranted, s);
}

View File

@ -137,15 +137,23 @@ typedef struct {
tmsg_t msgType; // message type
int8_t connType; // connection type cli/srv
int8_t retryCnt;
int8_t retryLimit;
STransCtx appCtx; //
STransMsg* pRsp; // for synchronous API
tsem_t* pSem; // for synchronous API
SCvtAddr cvtAddr;
bool setMaxRetry;
int32_t retryMinInterval;
int32_t retryMaxInterval;
int32_t retryStepFactor;
int64_t retryMaxTimeout;
int64_t retryInitTimestamp;
int64_t retryNextInterval;
bool retryInit;
int32_t retryStep;
int8_t epsetRetryCnt;
int hThrdIdx;
} STransConnCtx;

View File

@ -52,6 +52,11 @@ typedef struct {
int32_t retryLimit; // retry limit
int32_t retryInterval; // retry interval ms
int32_t retryMinInterval; // retry init interval
int32_t retryStepFactor; // retry interval factor
int32_t retryMaxInterval; // retry max interval
int32_t retryMaxTimouet;
void (*cfp)(void* parent, SRpcMsg*, SEpSet*);
bool (*retry)(int32_t code, tmsg_t msgType);
bool (*startTimer)(int32_t code, tmsg_t msgType);

View File

@ -51,6 +51,11 @@ void* rpcOpen(const SRpcInit* pInit) {
pRpc->retryLimit = pInit->retryLimit;
pRpc->retryInterval = pInit->retryInterval;
pRpc->retryMinInterval = pInit->retryMinInterval; // retry init interval
pRpc->retryStepFactor = pInit->retryStepFactor;
pRpc->retryMaxInterval = pInit->retryMaxInterval;
pRpc->retryMaxTimouet = pInit->retryMaxTimouet;
// register callback handle
pRpc->cfp = pInit->cfp;
pRpc->retry = pInit->rfp;

View File

@ -234,7 +234,7 @@ static void cliWalkCb(uv_handle_t* handle, void* arg);
#define REQUEST_PERSIS_HANDLE(msg) ((msg)->info.persistHandle == 1)
#define REQUEST_RELEASE_HANDLE(cmsg) ((cmsg)->type == Release)
#define EPSET_IS_VALID(epSet) ((epSet) != NULL && (epSet)->numOfEps != 0)
#define EPSET_IS_VALID(epSet) ((epSet) != NULL && (epSet)->numOfEps >= 0 && (epSet)->inUse >= 0)
#define EPSET_GET_SIZE(epSet) (epSet)->numOfEps
#define EPSET_GET_INUSE_IP(epSet) ((epSet)->eps[(epSet)->inUse].fqdn)
#define EPSET_GET_INUSE_PORT(epSet) ((epSet)->eps[(epSet)->inUse].port)
@ -367,7 +367,7 @@ void cliHandleResp(SCliConn* conn) {
STraceId* trace = &transMsg.info.traceId;
tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, code str:%s", CONN_GET_INST_LABEL(conn), conn,
TMSG_INFO(pHead->msgType), conn->dst, conn->src, msgLen, tstrerror(transMsg.code));
TMSG_INFO(pHead->msgType), conn->dst, conn->src, pHead->msgLen, tstrerror(transMsg.code));
if (pCtx == NULL && CONN_NO_PERSIST_BY_APP(conn)) {
tDebug("%s except, conn %p read while cli ignore it", CONN_GET_INST_LABEL(conn), conn);
@ -971,7 +971,7 @@ FORCE_INLINE void cliMayCvtFqdnToIp(SEpSet* pEpSet, SCvtAddr* pCvtAddr) {
FORCE_INLINE bool cliIsEpsetUpdated(int32_t code, STransConnCtx* pCtx) {
if (code != 0) return false;
if (pCtx->retryCnt == 0) return false;
// if (pCtx->retryCnt == 0) return false;
if (transEpSetIsEqual(&pCtx->epSet, &pCtx->origEpSet)) return false;
return true;
}
@ -1008,9 +1008,14 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) {
STransConnCtx* pCtx = pMsg->ctx;
cliMayCvtFqdnToIp(&pCtx->epSet, &pThrd->cvtAddr);
char tbuf[256] = {0};
EPSET_DEBUG_STR(&pCtx->epSet, tbuf);
tDebug("current epset %s", tbuf);
if (!EPSET_IS_VALID(&pCtx->epSet)) {
destroyCmsg(pMsg);
tError("invalid epset");
destroyCmsg(pMsg);
return;
}
@ -1047,11 +1052,13 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) {
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = cliGetIpFromFqdnCache(pThrd->fqdn2ipCache, conn->ip);
addr.sin_port = (uint16_t)htons((uint16_t)conn->port);
tTrace("%s conn %p try to connect to %s:%d", pTransInst->label, conn, conn->ip, conn->port);
STraceId* trace = &(pMsg->msg.info.traceId);
tGTrace("%s conn %p try to connect to %s:%d", pTransInst->label, conn, conn->ip, conn->port);
int ret = uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, cliConnCb);
if (ret != 0) {
tTrace("%s conn %p failed to connect to %s:%d, reason:%s", pTransInst->label, conn, conn->ip, conn->port,
tGTrace("%s conn %p failed to connect to %s:%d, reason:%s", pTransInst->label, conn, conn->ip, conn->port,
uv_err_name(ret));
uv_timer_stop(conn->timer);
@ -1378,13 +1385,14 @@ static void cliSchedMsgToNextNode(SCliMsg* pMsg, SCliThrd* pThrd) {
STraceId* trace = &pMsg->msg.info.traceId;
char tbuf[256] = {0};
EPSET_DEBUG_STR(&pCtx->epSet, tbuf);
tGDebug("%s retry on next node, use %s, retryCnt:%d, limit:%d", transLabel(pThrd->pTransInst), tbuf,
pCtx->retryCnt + 1, pCtx->retryLimit);
tGDebug("%s retry on next node,use:%s, step: %d,timeout:%" PRId64 "", transLabel(pThrd->pTransInst), tbuf,
pCtx->retryStep, pCtx->retryNextInterval);
STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg));
arg->param1 = pMsg;
arg->param2 = pThrd;
transDQSched(pThrd->delayQueue, doDelayTask, arg, pTransInst->retryInterval);
transDQSched(pThrd->delayQueue, doDelayTask, arg, pCtx->retryNextInterval);
}
FORCE_INLINE void cliCompareAndSwap(int8_t* val, int8_t exp, int8_t newVal) {
@ -1418,6 +1426,122 @@ FORCE_INLINE bool cliTryExtractEpSet(STransMsg* pResp, SEpSet* dst) {
*dst = epset;
return true;
}
bool cliResetEpset(STransConnCtx* pCtx, STransMsg* pResp, bool hasEpSet) {
bool noDelay = true;
if (hasEpSet == false) {
// assert(pResp->contLen == 0);
if (pResp->contLen == 0) {
if (pCtx->epsetRetryCnt >= pCtx->epSet.numOfEps) {
noDelay = false;
} else {
EPSET_FORWARD_INUSE(&pCtx->epSet);
}
} else {
if (pCtx->epsetRetryCnt >= pCtx->epSet.numOfEps) {
noDelay = false;
} else {
EPSET_FORWARD_INUSE(&pCtx->epSet);
}
}
} else {
SEpSet epSet;
int32_t valid = tDeserializeSEpSet(pResp->pCont, pResp->contLen, &epSet);
if (valid < 0) {
tDebug("get invalid epset, epset equal, continue");
if (pCtx->epsetRetryCnt >= pCtx->epSet.numOfEps) {
noDelay = false;
} else {
EPSET_FORWARD_INUSE(&pCtx->epSet);
}
} else {
if (!transEpSetIsEqual(&pCtx->epSet, &epSet)) {
tDebug("epset not equal, retry new epset");
pCtx->epSet = epSet;
noDelay = false;
} else {
if (pCtx->epsetRetryCnt >= pCtx->epSet.numOfEps) {
noDelay = false;
} else {
tDebug("epset equal, continue");
EPSET_FORWARD_INUSE(&pCtx->epSet);
}
}
}
}
return noDelay;
}
bool cliGenRetryRule(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
SCliThrd* pThrd = pConn->hostThrd;
STrans* pTransInst = pThrd->pTransInst;
STransConnCtx* pCtx = pMsg->ctx;
int32_t code = pResp->code;
bool retry = pTransInst->retry != NULL ? pTransInst->retry(code, pResp->msgType - 1) : false;
if (retry == false) {
return false;
}
if (!pCtx->retryInit) {
pCtx->retryMinInterval = pTransInst->retryMinInterval;
pCtx->retryMaxInterval = pTransInst->retryMaxInterval;
pCtx->retryStepFactor = pTransInst->retryStepFactor;
pCtx->retryMaxTimeout = pTransInst->retryMaxTimouet;
pCtx->retryInitTimestamp = taosGetTimestampMs();
pCtx->retryNextInterval = pCtx->retryMinInterval;
pCtx->retryStep = 0;
pCtx->retryInit = true;
}
if (-1 != pCtx->retryMaxTimeout && taosGetTimestampMs() - pCtx->retryInitTimestamp >= pCtx->retryMaxTimeout) {
return false;
}
bool noDelay = false;
if (code == TSDB_CODE_RPC_BROKEN_LINK || code == TSDB_CODE_RPC_NETWORK_UNAVAIL) {
tDebug("code str %s, contlen:%d 0", tstrerror(code), pResp->contLen);
noDelay = cliResetEpset(pCtx, pResp, false);
transFreeMsg(pResp->pCont);
transUnrefCliHandle(pConn);
} else if (code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_SYN_INTERNAL_ERROR ||
code == TSDB_CODE_SYN_PROPOSE_NOT_READY || code == TSDB_CODE_RPC_REDIRECT) {
tDebug("code str %s, contlen:%d 1", tstrerror(code), pResp->contLen);
noDelay = cliResetEpset(pCtx, pResp, true);
transFreeMsg(pResp->pCont);
addConnToPool(pThrd->pool, pConn);
} else if (code == TSDB_CODE_SYN_RESTORING) {
tDebug("code str %s, contlen:%d 0", tstrerror(code), pResp->contLen);
noDelay = cliResetEpset(pCtx, pResp, false);
addConnToPool(pThrd->pool, pConn);
transFreeMsg(pResp->pCont);
} else {
tDebug("code str %s, contlen:%d 0", tstrerror(code), pResp->contLen);
noDelay = cliResetEpset(pCtx, pResp, false);
addConnToPool(pThrd->pool, pConn);
transFreeMsg(pResp->pCont);
}
if (noDelay == false) {
pCtx->epsetRetryCnt = 1;
pCtx->retryStep++;
int64_t factor = pow(pCtx->retryStepFactor, pCtx->retryStep - 1);
pCtx->retryNextInterval = factor * pCtx->retryMinInterval;
if (pCtx->retryNextInterval >= pCtx->retryMaxInterval) {
pCtx->retryNextInterval = pCtx->retryMaxInterval;
}
if (-1 != pCtx->retryMaxTimeout && taosGetTimestampMs() - pCtx->retryInitTimestamp >= pCtx->retryMaxTimeout) {
return false;
}
} else {
pCtx->retryNextInterval = 0;
pCtx->epsetRetryCnt++;
}
pMsg->sent = 0;
cliSchedMsgToNextNode(pMsg, pThrd);
return true;
}
int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
SCliThrd* pThrd = pConn->hostThrd;
STrans* pTransInst = pThrd->pTransInst;
@ -1431,40 +1555,10 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
STransConnCtx* pCtx = pMsg->ctx;
int32_t code = pResp->code;
bool retry = (pTransInst->retry != NULL && pTransInst->retry(code, pResp->msgType - 1)) ? true : false;
if (retry) {
pMsg->sent = 0;
pCtx->retryCnt += 1;
if (code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_RPC_BROKEN_LINK) {
cliCompareAndSwap(&pCtx->retryLimit, pTransInst->retryLimit, EPSET_GET_SIZE(&pCtx->epSet) * 3);
if (pCtx->retryCnt < pCtx->retryLimit) {
transUnrefCliHandle(pConn);
EPSET_FORWARD_INUSE(&pCtx->epSet);
transFreeMsg(pResp->pCont);
cliSchedMsgToNextNode(pMsg, pThrd);
bool retry = cliGenRetryRule(pConn, pResp, pMsg);
if (retry == true) {
return -1;
}
} else {
cliCompareAndSwap(&pCtx->retryLimit, pTransInst->retryLimit, pTransInst->retryLimit);
if (pCtx->retryCnt < pCtx->retryLimit) {
if (pResp->contLen == 0) {
EPSET_FORWARD_INUSE(&pCtx->epSet);
} else {
if (tDeserializeSEpSet(pResp->pCont, pResp->contLen, &pCtx->epSet) < 0) {
tError("%s conn %p failed to deserialize epset", CONN_GET_INST_LABEL(pConn), pConn);
}
}
addConnToPool(pThrd->pool, pConn);
transFreeMsg(pResp->pCont);
cliSchedMsgToNextNode(pMsg, pThrd);
return -1;
} else {
// change error code for taos client driver if retryCnt exceeds limit
if (0 == strncmp(pTransInst->label, "TSC", strlen("TSC"))) pResp->code = TSDB_CODE_APP_NOT_READY;
}
}
}
STraceId* trace = &pResp->info.traceId;
bool hasEpSet = cliTryExtractEpSet(pResp, &pCtx->epSet);

View File

@ -1229,7 +1229,7 @@ int transReleaseSrvHandle(void* handle) {
m->msg = tmsg;
m->type = Release;
tTrace("%s conn %p start to release", transLabel(pThrd->pTransInst), exh->handle);
tDebug("%s conn %p start to release", transLabel(pThrd->pTransInst), exh->handle);
if (0 != transAsyncSend(pThrd->asyncPool, &m->q)) {
destroySmsg(m);
}
@ -1237,11 +1237,11 @@ int transReleaseSrvHandle(void* handle) {
transReleaseExHandle(transGetRefMgt(), refId);
return 0;
_return1:
tTrace("handle %p failed to send to release handle", exh);
tDebug("handle %p failed to send to release handle", exh);
transReleaseExHandle(transGetRefMgt(), refId);
return -1;
_return2:
tTrace("handle %p failed to send to release handle", exh);
tDebug("handle %p failed to send to release handle", exh);
return -1;
}
int transSendResponse(const STransMsg* msg) {
@ -1266,7 +1266,7 @@ int transSendResponse(const STransMsg* msg) {
m->type = Normal;
STraceId* trace = (STraceId*)&msg->info.traceId;
tGTrace("conn %p start to send resp (1/2)", exh->handle);
tGDebug("conn %p start to send resp (1/2)", exh->handle);
if (0 != transAsyncSend(pThrd->asyncPool, &m->q)) {
destroySmsg(m);
}
@ -1275,12 +1275,12 @@ int transSendResponse(const STransMsg* msg) {
return 0;
_return1:
tTrace("handle %p failed to send resp", exh);
tDebug("handle %p failed to send resp", exh);
rpcFreeCont(msg->pCont);
transReleaseExHandle(transGetRefMgt(), refId);
return -1;
_return2:
tTrace("handle %p failed to send resp", exh);
tDebug("handle %p failed to send resp", exh);
rpcFreeCont(msg->pCont);
return -1;
}
@ -1302,7 +1302,7 @@ int transRegisterMsg(const STransMsg* msg) {
m->type = Register;
STrans* pTransInst = pThrd->pTransInst;
tTrace("%s conn %p start to register brokenlink callback", transLabel(pTransInst), exh->handle);
tDebug("%s conn %p start to register brokenlink callback", transLabel(pTransInst), exh->handle);
if (0 != transAsyncSend(pThrd->asyncPool, &m->q)) {
destroySmsg(m);
}
@ -1311,12 +1311,12 @@ int transRegisterMsg(const STransMsg* msg) {
return 0;
_return1:
tTrace("handle %p failed to register brokenlink", exh);
tDebug("handle %p failed to register brokenlink", exh);
rpcFreeCont(msg->pCont);
transReleaseExHandle(transGetRefMgt(), refId);
return -1;
_return2:
tTrace("handle %p failed to register brokenlink", exh);
tDebug("handle %p failed to register brokenlink", exh);
rpcFreeCont(msg->pCont);
return -1;
}

View File

@ -76,7 +76,8 @@ int32_t tsem_wait(tsem_t* sem) {
}
int32_t tsem_timewait(tsem_t* sem, int64_t milis) {
return tsem_wait(sem);
return 0;
/*return tsem_wait(sem);*/
#if 0
struct timespec ts;
timespec_get(&ts);

View File

@ -95,6 +95,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MSG_DECODE_ERROR, "Msg decode error")
TAOS_DEFINE_ERROR(TSDB_CODE_NO_AVAIL_DISK, "No available disk")
TAOS_DEFINE_ERROR(TSDB_CODE_NOT_FOUND, "Not found")
TAOS_DEFINE_ERROR(TSDB_CODE_NO_DISKSPACE, "Out of disk space")
TAOS_DEFINE_ERROR(TSDB_CODE_TIMEOUT_ERROR, "Operation timeout")
//client
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_INVALID_OPERATION, "Invalid operation")
@ -407,6 +408,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_SYN_RECONFIG_NOT_READY, "Sync not ready for re
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_PROPOSE_NOT_READY, "Sync not ready for propose")
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_STANDBY_NOT_READY, "Sync not ready for standby")
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_BATCH_ERROR, "Sync batch error")
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_RESTORING, "Sync is restoring")
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INTERNAL_ERROR, "Sync internal error")
//tq

View File

@ -113,7 +113,7 @@
,,y,script,./test.sh -f tsim/parser/first_last.sim
,,y,script,./test.sh -f tsim/parser/fill_stb.sim
,,y,script,./test.sh -f tsim/parser/interp.sim
,,y,script,./test.sh -f tsim/parser/limit2.sim
#,,y,script,./test.sh -f tsim/parser/limit2.sim
,,y,script,./test.sh -f tsim/parser/fourArithmetic-basic.sim
,,y,script,./test.sh -f tsim/parser/function.sim
,,y,script,./test.sh -f tsim/parser/groupby-basic.sim

View File

@ -496,18 +496,18 @@ sql create table tm0 using m1 tags('abc', 1);
sql create table m2(ts timestamp, k int) tags(a int, b binary(12));
sql select count(*) from m1, m2 where m1.ts=m2.ts and m1.b=m2.a;
if $rows != 0 then
if $rows != 1 then
return -1
endi
sql create table tm2 using m2 tags(2, 'abc');
sql select count(*) from tm0, tm2 where tm0.ts=tm2.ts;
if $rows != 0 then
if $rows != 1 then
return -1
endi
sql select count(*) from m1, m2 where m1.ts=m2.ts and m1.b=m2.a;
if $rows != 0 then
if $rows != 1 then
return -1
endi

View File

@ -55,7 +55,7 @@ endi
# regression test case 1
sql select count(*) from lr_tb1 where ts>'2018-09-18 08:45:00.1' and ts<'2018-09-18 08:45:00.2'
if $row != 0 then
if $row != 1 then
return -1
endi

View File

@ -245,12 +245,12 @@ endi
# first subclause are empty
sql (select count(*) as c from union_tb0 where ts > now + 3650d) union all (select sum(c1) as c from union_tb1);
if $rows != 1 then
return -1
endi
if $data00 != 495000 then
if $rows != 2 then
return -1
endi
#if $data00 != 495000 then
# return -1
#endi
# all subclause are empty
sql (select c1 from union_tb0 limit 0) union all (select c1 from union_tb1 where ts>'2021-1-1 0:0:0')

View File

@ -48,7 +48,7 @@ class TDTestCase:
tdSql.checkData(0,0,1)
#!for bug
tdDnodes.stoptaosd(1)
sleep(self.delaytime)
sleep(self.delaytime * 5)
if platform.system().lower() == 'windows':
sleep(10)
tdSql.error('select server_status()')

View File

@ -121,7 +121,7 @@ class TDTestCase:
tdSql.checkRows(0)
tdSql.query(f'select count(*) from {stbname}')
if tb_num <= 1:
if len(tdSql.queryResult) != 0:
if len(tdSql.queryResult) != 1 and tdSql.queryResult[0][0] != 0:
tdLog.exit('delete case failure!')
else:
tdSql.checkEqual(tdSql.queryResult[0][0],(tb_num-1)*row_num)

View File

@ -51,6 +51,129 @@ class TDTestCase:
tdSql.checkEqual(tdSql.queryResult[0][0],rownum)
tdSql.query(f'select count({k}) from {ntbname} where ts <={self.ts+self.rowNum-2}')
tdSql.checkEqual(tdSql.queryResult[0][0],rownum-1)
def query_empty_stb(self):
tdSql.query(f'select count(*) from (select distinct tbname from {self.stbname})')
tdSql.checkEqual(tdSql.queryResult[0][0],self.tbnum)
tdSql.query(f'select count(*) from {self.stbname}')
tdSql.checkRows(1)
tdSql.checkData(0, 0, 0)
function_names = ['count', 'hyperloglog']
for function_name in function_names:
tdSql.query(f'select {function_name}(tbname) from {self.stbname}')
tdSql.checkRows(1)
tdSql.checkData(0, 0, 0)
tdSql.query(f'select {function_name}(c1) from {self.stbname}')
tdSql.checkRows(1)
tdSql.checkData(0, 0, 0)
tdSql.query(f'select {function_name}(ts) from {self.stbname}')
tdSql.checkRows(1)
tdSql.checkData(0, 0, 0)
tdSql.query(f'select {function_name}(1) from {self.stbname}')
tdSql.checkRows(1)
tdSql.checkData(0, 0, 0)
tdSql.query(f'select {function_name}(c1),sum(c2),max(1) from {self.stbname}')
tdSql.checkRows(1)
tdSql.checkCols(3)
tdSql.checkData(0, 0, 0)
tdSql.checkData(0, 1, None)
tdSql.checkData(0, 2, None)
tdSql.query(f'select sum(1),{function_name}(1),max(c2) from {self.stbname}')
tdSql.checkRows(1)
tdSql.checkCols(3)
tdSql.checkData(0, 0, None)
tdSql.checkData(0, 1, 0)
tdSql.checkData(0, 2, None)
tdSql.query(f'select {function_name}(1),sum(1),max(c2),min(1),min(2),min(3),min(4),min(5),min(6),min(7),min(8) from {self.stbname}')
tdSql.checkRows(1)
tdSql.checkCols(11)
tdSql.checkData(0, 0, 0)
tdSql.checkData(0, 1, None)
tdSql.checkData(0, 2, None)
tdSql.checkData(0, 10, None)
tdSql.query(f'select sum(1),max(c2),min(1),leastsquares(c1,1,1) from {self.stbname}')
tdSql.checkRows(0)
tdSql.query(f'select {function_name}(c1),sum(c1) from {self.stbname} group by tbname')
tdSql.checkRows(0)
tdSql.query(f'select {function_name}(c1),sum(c1) from {self.stbname} group by c1')
tdSql.checkRows(0)
tdSql.query(f'select {function_name}(c1),sum(c1) from {self.stbname} group by t0')
tdSql.checkRows(0)
tdSql.query(f'select {function_name}(c1),sum(c1) from {self.stbname} partition by tbname')
tdSql.checkRows(0)
tdSql.query(f'select {function_name}(c1),sum(c1) from {self.stbname} partition by c1')
tdSql.checkRows(0)
tdSql.query(f'select {function_name}(c1),sum(c1) from {self.stbname} partition by t0')
tdSql.checkRows(0)
tdSql.query(f'select {function_name}(1) from (select {function_name}(c1),sum(c1) from {self.stbname} group by c1)')
tdSql.checkRows(1)
tdSql.checkData(0, 0, 0)
tdSql.query(f'select {function_name}(c1),sum(c1) from {self.stbname} interval(1s)')
tdSql.checkRows(0)
tdSql.query(f'select {function_name}(c1),sum(c1) from {self.stbname} partition by tbname interval(1s)')
tdSql.checkRows(0)
tdSql.query(f'select {function_name}(c1),sum(c1) from {self.stbname} partition by c1 interval(1s)')
tdSql.checkRows(0)
tdSql.query(f'select {function_name}(1),sum(1) from (select {function_name}(1) from {self.stbname} group by tbname)')
tdSql.checkRows(1)
tdSql.checkData(0, 0, 0)
tdSql.checkData(0, 1, None)
def query_empty_ntb(self):
tdSql.query(f'select count(*) from {self.ntbname}')
tdSql.checkRows(1)
tdSql.checkData(0, 0, 0)
function_names = ['count', 'hyperloglog']
for function_name in function_names:
tdSql.query(f'select {function_name}(tbname) from {self.ntbname}')
tdSql.checkRows(1)
tdSql.checkData(0, 0, 0)
tdSql.query(f'select {function_name}(c1) from {self.ntbname}')
tdSql.checkRows(1)
tdSql.checkData(0, 0, 0)
tdSql.query(f'select {function_name}(ts) from {self.ntbname}')
tdSql.checkRows(1)
tdSql.checkData(0, 0, 0)
tdSql.query(f'select {function_name}(1) from {self.ntbname}')
tdSql.checkRows(1)
tdSql.checkData(0, 0, 0)
tdSql.query(f'select {function_name}(c1),sum(c2),max(1) from {self.ntbname}')
tdSql.checkRows(1)
tdSql.checkCols(3)
tdSql.checkData(0, 0, 0)
tdSql.checkData(0, 1, None)
tdSql.checkData(0, 2, None)
tdSql.query(f'select sum(1),{function_name}(1),max(c2) from {self.ntbname}')
tdSql.checkRows(1)
tdSql.checkCols(3)
tdSql.checkData(0, 0, None)
tdSql.checkData(0, 1, 0)
tdSql.checkData(0, 2, None)
tdSql.query(f'select {function_name}(1),sum(1),max(c2),min(1),min(2),min(3),min(4),min(5),min(6),min(7),min(8) from {self.ntbname}')
tdSql.checkRows(1)
tdSql.checkCols(11)
tdSql.checkData(0, 0, 0)
tdSql.checkData(0, 1, None)
tdSql.checkData(0, 2, None)
tdSql.checkData(0, 10, None)
tdSql.query(f'select sum(1),max(c2),min(1),leastsquares(c1,1,1) from {self.ntbname}')
tdSql.checkRows(0)
tdSql.query(f'select {function_name}(c1),sum(c1) from {self.ntbname} group by tbname')
tdSql.checkRows(0)
tdSql.query(f'select {function_name}(c1),sum(c1) from {self.ntbname} group by c1')
tdSql.checkRows(0)
tdSql.query(f'select {function_name}(1) from (select {function_name}(c1),sum(c1) from {self.ntbname} group by c1)')
tdSql.checkRows(1)
tdSql.checkData(0, 0, 0)
tdSql.query(f'select {function_name}(c1),sum(c1) from {self.ntbname} interval(1s)')
tdSql.checkRows(0)
tdSql.query(f'select {function_name}(c1),sum(c1) from {self.ntbname} partition by tbname interval(1s)')
tdSql.checkRows(0)
tdSql.query(f'select {function_name}(c1),sum(c1) from {self.ntbname} partition by c1 interval(1s)')
tdSql.checkRows(0)
tdSql.query(f'select count(1),sum(1) from (select count(1) from {self.ntbname} group by tbname)')
tdSql.checkRows(1)
tdSql.checkData(0, 0, 0)
tdSql.checkData(0, 1, None)
def count_query_stb(self,column_dict,tag_dict,stbname,tbnum,rownum):
tdSql.query(f'select count(tbname) from {stbname}')
tdSql.checkEqual(tdSql.queryResult[0][0],tbnum*rownum)
@ -81,11 +204,11 @@ class TDTestCase:
def check_ntb(self):
tdSql.prepare()
tdSql.execute(self.setsql.set_create_normaltable_sql(self.ntbname,self.column_dict))
tdSql.query(f'select count(tbname) from {self.ntbname}')
tdSql.checkRows(0)
self.query_empty_ntb()
tdSql.execute('flush database db')
tdSql.query(f'select count(tbname) from {self.ntbname}')
tdSql.checkRows(0)
tdSql.checkRows(1)
tdSql.checkData(0, 0, 0)
self.insert_data(self.column_dict,self.ntbname,self.rowNum)
self.count_query_ntb(self.column_dict,self.ntbname,self.rowNum)
tdSql.execute('flush database db')
@ -96,13 +219,11 @@ class TDTestCase:
tdSql.execute(self.setsql.set_create_stable_sql(self.stbname,self.column_dict,self.tag_dict))
for i in range(self.tbnum):
tdSql.execute(f'create table {self.stbname}_{i} using {self.stbname} tags({self.tag_values[i]})')
tdSql.query(f'SELECT count(*) from (select distinct tbname from {self.stbname})')
tdSql.checkEqual(tdSql.queryResult[0][0],self.tbnum)
tdSql.query(f'select count(tbname) from {self.stbname}')
tdSql.checkRows(0)
self.query_empty_stb()
tdSql.execute('flush database db')
tdSql.query(f'select count(tbname) from {self.stbname}')
tdSql.checkRows(0)
tdSql.checkRows(1)
tdSql.checkData(0, 0, 0)
tdSql.query(f'SELECT count(*) from (select distinct tbname from {self.stbname})')
tdSql.checkEqual(tdSql.queryResult[0][0],self.tbnum)
for i in range(self.tbnum):

View File

@ -33,6 +33,19 @@ class TDTestCase:
f"create table {dbname}.ctb2 using {dbname}.stb tags (2)"
)
tdSql.execute(
f"create table {dbname}.tb_empty (ts timestamp, c0 int)"
)
tdSql.execute(
f"create table {dbname}.stb_empty (ts timestamp, c0 int) tags (t0 int)"
)
tdSql.execute(
f"create table {dbname}.ctb1_empty using {dbname}.stb tags (1)"
)
tdSql.execute(
f"create table {dbname}.ctb2_empty using {dbname}.stb tags (2)"
)
tdSql.execute(
f"insert into {dbname}.tb values (now(), NULL)")
@ -94,6 +107,61 @@ class TDTestCase:
tdSql.checkRows(1)
tdSql.checkData(0, 0, 0)
# test empty table/input
tdSql.query(f"select count(*) from {dbname}.tb where ts > now + 1h")
tdSql.checkRows(0)
tdSql.query(f"select count(ts) from {dbname}.stb where ts > now + 1h")
tdSql.checkRows(0)
tdSql.query(f"select count(c0) from {dbname}.ctb1 where ts > now + 1h")
tdSql.checkRows(0)
tdSql.query(f"select count(1) from {dbname}.ctb2 where ts > now + 1h")
tdSql.checkRows(0)
tdSql.query(f"select count(*) from {dbname}.tb_empty")
tdSql.checkRows(0)
tdSql.query(f"select count(ts) from {dbname}.stb_empty")
tdSql.checkRows(0)
tdSql.query(f"select count(c0) from {dbname}.ctb1_empty")
tdSql.checkRows(0)
tdSql.query(f"select count(1) from {dbname}.ctb2_empty")
tdSql.checkRows(0)
tdSql.query(f"select hyperloglog(c0) from {dbname}.tb where ts > now + 1h")
tdSql.checkRows(0)
tdSql.query(f"select hyperloglog(ts) from {dbname}.stb where ts > now + 1h")
tdSql.checkRows(0)
tdSql.query(f"select hyperloglog(1) from {dbname}.ctb1 where ts > now + 1h")
tdSql.checkRows(0)
tdSql.query(f"select hyperloglog(1) from {dbname}.ctb2 where ts > now + 1h")
tdSql.checkRows(0)
tdSql.query(f"select hyperloglog(c0) from {dbname}.tb_empty")
tdSql.checkRows(0)
tdSql.query(f"select hyperloglog(ts) from {dbname}.stb_empty")
tdSql.checkRows(0)
tdSql.query(f"select hyperloglog(1) from {dbname}.ctb1_empty")
tdSql.checkRows(0)
tdSql.query(f"select hyperloglog(1) from {dbname}.ctb2_empty")
tdSql.checkRows(0)
tdSql.query(f"select count(*), hyperloglog(c0), sum(1), max(c0) from {dbname}.tb where ts > now + 1h")
tdSql.checkRows(0)
tdSql.query(f"select count(*), hyperloglog(c0), sum(1), max(c0) from {dbname}.tb_empty")
tdSql.checkRows(0)
def run(self):
tdSql.prepare()

View File

@ -402,7 +402,8 @@ class TDTestCase:
tdSql.checkRows(0)
tdSql.query(f"select count(c1) from {dbname}.ct4 where t1 = 1 ")
tdSql.checkRows(0)
tdSql.checkRows(1)
tdSql.checkData(0,0,0)
tdSql.query(f"select last_row(c1) ,last(c1) from {dbname}.stb1 where c1 is null")
tdSql.checkRows(1)

View File

@ -298,7 +298,7 @@ class TDTestCase:
if (platform.system().lower() == 'windows' and pre_result.dtype == 'int32'):
pre_result = np.array(pre_result, dtype = 'int64')
pre_mavg = pre_mavg = np.convolve(pre_result, np.ones(k), "valid")[offset_val:]/k
#pre_mavg = pre_mavg = np.convolve(pre_result, np.ones(k), "valid")[offset_val:]/k
tdSql.query(self.mavg_query_form(
sel=sel, func=func, col=col, m_comm=m_comm, k=k, r_comm=r_comm, alias=alias, fr=fr,
table_expr=table_expr, condition=condition

View File

@ -76,6 +76,32 @@ static void msg_process(TAOS_RES* msg) {
}
int buildDatabase(TAOS* pConn, TAOS_RES* pRes){
/* test for TD-20612 start*/
// pRes = taos_query(pConn,"create table tb1 (ts timestamp, c1 int, c2 int)");
// if (taos_errno(pRes) != 0) {
// printf("failed to create super table st1, reason:%s\n", taos_errstr(pRes));
// return -1;
// }
// taos_free_result(pRes);
//
// pRes = taos_query(pConn,"insert into tb1 (ts, c1) values(1669092069069, 0)");
// if (taos_errno(pRes) != 0) {
// printf("failed to create super table st1, reason:%s\n", taos_errstr(pRes));
// return -1;
// }
// taos_free_result(pRes);
//
// pRes = taos_query(pConn,"insert into tb1 (ts, c2) values(1669092069069, 1)");
// if (taos_errno(pRes) != 0) {
// printf("failed to create super table st1, reason:%s\n", taos_errstr(pRes));
// return -1;
// }
// taos_free_result(pRes);
//
// return 0;
/* test for TD-20612 end*/
pRes = taos_query(pConn,
"create stable if not exists st1 (ts timestamp, c1 int, c2 float, c3 binary(16)) tags(t1 int, t3 "
"nchar(8), t4 bool)");