Merge branch 'main' of https://github.com/taosdata/TDengine into fix/TS-3222

This commit is contained in:
wangmm0220 2023-04-21 17:46:35 +08:00
commit 6efbcfec4b
46 changed files with 700 additions and 317 deletions

View File

@ -225,6 +225,9 @@ DLL_EXPORT int taos_get_tables_vgId(TAOS *taos, const char *db, const char *tabl
DLL_EXPORT int taos_load_table_info(TAOS *taos, const char *tableNameList);
// set heart beat thread quit mode , if quicByKill 1 then kill thread else quit from inner
DLL_EXPORT void taos_set_hb_quit(int8_t quitByKill);
/* --------------------------schemaless INTERFACE------------------------------- */
DLL_EXPORT TAOS_RES *taos_schemaless_insert(TAOS *taos, char *lines[], int numLines, int protocol, int precision);

View File

@ -379,6 +379,8 @@ typedef struct STUidTagInfo {
#define UD_GROUPID_COLUMN_INDEX 1
#define UD_TAG_COLUMN_INDEX 2
int32_t taosGenCrashJsonMsg(int signum, char **pMsg, int64_t clusterId, int64_t startTime);
#ifdef __cplusplus
}
#endif

View File

@ -273,6 +273,7 @@ typedef struct SStreamId {
typedef struct SCheckpointInfo {
int64_t id;
int64_t version; // offset in WAL
int64_t currentVer;// current offset in WAL, not serialize it
} SCheckpointInfo;
typedef struct SStreamStatus {
@ -345,7 +346,7 @@ typedef struct SStreamMeta {
FTaskExpand* expandFunc;
int32_t vgId;
SRWLatch lock;
int8_t walScan;
int32_t walScan;
} SStreamMeta;
int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo);
@ -537,6 +538,7 @@ void streamTaskInputFail(SStreamTask* pTask);
int32_t streamTryExec(SStreamTask* pTask);
int32_t streamSchedExec(SStreamTask* pTask);
int32_t streamTaskOutput(SStreamTask* pTask, SStreamDataBlock* pBlock);
bool streamTaskShouldStop(const SStreamStatus* pStatus);
int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz);

View File

@ -102,7 +102,6 @@ bool taosAssertRelease(bool condition);
void taosLogCrashInfo(char *nodeType, char *pMsg, int64_t msgLen, int signum, void *sigInfo);
void taosReadCrashInfo(char *filepath, char **pMsg, int64_t *pMsgLen, TdFilePtr *pFd);
void taosReleaseCrashLogFile(TdFilePtr pFile, bool truncateFile);
int32_t taosGenCrashJsonMsg(int signum, char **pMsg, int64_t clusterId, int64_t startTime);
// clang-format off
#define uFatal(...) { if (uDebugFlag & DEBUG_FATAL) { taosPrintLog("UTL FATAL", DEBUG_FATAL, tsLogEmbedded ? 255 : uDebugFlag, __VA_ARGS__); }}

View File

@ -42,8 +42,9 @@ if [ "$DISABLE_ADAPTER" = "0" ]; then
done
fi
# if has mnode ep set or the host is first ep or not for cluster, just start.
if [ -f "$DATA_DIR/dnode/mnodeEpSet.json" ] ||
# if dnode has been created or has mnode ep set or the host is first ep or not for cluster, just start.
if [ -f "$DATA_DIR/dnode/dnode.json" ] ||
[ -f "$DATA_DIR/dnode/mnodeEpSet.json" ] ||
[ "$TAOS_FQDN" = "$FIRST_EP_HOST" ]; then
$@
# others will first wait the first ep ready.

View File

@ -197,7 +197,8 @@ if [[ $productName == "TDengine" ]]; then
mkdir -p ${install_dir}/connector
if [[ "$pagMode" != "lite" ]] && [[ "$cpuType" != "aarch32" ]]; then
if [ "$osType" != "Darwin" ]; then
[ -f ${build_dir}/lib/*.jar ] && cp ${build_dir}/lib/*.jar ${install_dir}/connector || :
jars=$(ls ${build_dir}/lib/*.jar 2>/dev/null|wc -l)
[ "${jars}" != "0" ] && cp ${build_dir}/lib/*.jar ${install_dir}/connector || :
fi
git clone --depth 1 https://github.com/taosdata/driver-go ${install_dir}/connector/go
rm -rf ${install_dir}/connector/go/.git ||:

View File

@ -338,7 +338,20 @@ if [ "$verMode" == "cluster" ] || [ "$verMode" == "cloud" ]; then
connector_dir="${code_dir}/connector"
mkdir -p ${install_dir}/connector
if [[ "$pagMode" != "lite" ]] && [[ "$cpuType" != "aarch32" ]]; then
[ -f ${build_dir}/lib/*.jar ] && cp ${build_dir}/lib/*.jar ${install_dir}/connector || :
tmp_pwd=`pwd`
cd ${install_dir}/connector
if [ ! -d taos-connector-jdbc ];then
git clone -b main --depth=1 https://github.com/taosdata/taos-connector-jdbc.git ||:
fi
cd taos-connector-jdbc
mvn clean package -Dmaven.test.skip=true
echo ${build_dir}/lib/
cp target/*.jar ${build_dir}/lib/
cd ${install_dir}/connector
rm -rf taos-connector-jdbc
cd ${tmp_pwd}
jars=$(ls ${build_dir}/lib/*.jar 2>/dev/null|wc -l)
[ "${jars}" != "0" ] && cp ${build_dir}/lib/*.jar ${install_dir}/connector || :
git clone --depth 1 https://github.com/taosdata/driver-go ${install_dir}/connector/go
rm -rf ${install_dir}/connector/go/.git ||:

View File

@ -80,6 +80,7 @@ typedef struct {
int64_t appId;
// ctl
int8_t threadStop;
int8_t quitByKill;
TdThread thread;
TdThreadMutex lock; // used when app init and cleanup
SHashObj* appSummary;

View File

@ -70,7 +70,7 @@ extern "C" {
#define VALUE_LEN 6
#define OTD_JSON_FIELDS_NUM 4
#define MAX_RETRY_TIMES 100
#define MAX_RETRY_TIMES 10
typedef TSDB_SML_PROTOCOL_TYPE SMLProtocolType;
typedef enum {

View File

@ -845,7 +845,12 @@ static void hbStopThread() {
return;
}
taosThreadJoin(clientHbMgr.thread, NULL);
// thread quit mode kill or inner exit from self-thread
if (clientHbMgr.quitByKill) {
taosThreadKill(clientHbMgr.thread, 0);
} else {
taosThreadJoin(clientHbMgr.thread, NULL);
}
tscDebug("hb thread stopped");
}
@ -1037,3 +1042,8 @@ void hbDeregisterConn(SAppHbMgr *pAppHbMgr, SClientHbKey connKey) {
atomic_sub_fetch_32(&pAppHbMgr->connKeyCnt, 1);
}
// set heart beat thread quit mode , if quicByKill 1 then kill thread else quit from inner
void taos_set_hb_quit(int8_t quitByKill) {
clientHbMgr.quitByKill = quitByKill;
}

View File

@ -535,7 +535,7 @@ static int32_t smlGenerateSchemaAction(SSchema *colField, SHashObj *colHash, SSm
if (index) {
if (colField[*index].type != kv->type) {
uError("SML:0x%" PRIx64 " point type and db type mismatch. point type: %d, db type: %d, key: %s", info->id, colField[*index].type, kv->type, kv->key);
return TSDB_CODE_TSC_INVALID_VALUE;
return TSDB_CODE_SML_INVALID_DATA;
}
if ((colField[*index].type == TSDB_DATA_TYPE_VARCHAR &&
@ -1494,8 +1494,8 @@ static int smlProcess(SSmlHandle *info, char *lines[], char *rawLine, char *rawL
do {
code = smlModifyDBSchemas(info);
if (code == 0) break;
taosMsleep(500);
if (code == 0 || code == TSDB_CODE_SML_INVALID_DATA) break;
taosMsleep(100);
uInfo("SML:0x%" PRIx64 " smlModifyDBSchemas retry code:%s, times:%d", info->id, tstrerror(code), retryNum);
} while (retryNum++ < taosHashGetSize(info->superTables) * MAX_RETRY_TIMES);

View File

@ -2503,9 +2503,11 @@ _exit:
int32_t tColDataAddValueByBind(SColData *pColData, TAOS_MULTI_BIND *pBind) {
int32_t code = 0;
ASSERT(pColData->type == pBind->buffer_type);
if (IS_VAR_DATA_TYPE(pBind->buffer_type)) { // var-length data type
if (!(pBind->num == 1 && pBind->is_null && *pBind->is_null)) {
ASSERT(pColData->type == pBind->buffer_type);
}
if (IS_VAR_DATA_TYPE(pColData->type)) { // var-length data type
for (int32_t i = 0; i < pBind->num; ++i) {
if (pBind->is_null && pBind->is_null[i]) {
code = tColDataAppendValueImpl[pColData->flag][CV_FLAG_NULL](pColData, NULL, 0);

View File

@ -15,6 +15,7 @@
#define _DEFAULT_SOURCE
#include "tmisce.h"
#include "tjson.h"
#include "tglobal.h"
#include "tlog.h"
#include "tname.h"
@ -87,3 +88,63 @@ SEpSet getEpSet_s(SCorEpSet* pEpSet) {
return ep;
}
int32_t taosGenCrashJsonMsg(int signum, char** pMsg, int64_t clusterId, int64_t startTime) {
SJson* pJson = tjsonCreateObject();
if (pJson == NULL) return -1;
char tmp[4096] = {0};
tjsonAddDoubleToObject(pJson, "reportVersion", 1);
tjsonAddIntegerToObject(pJson, "clusterId", clusterId);
tjsonAddIntegerToObject(pJson, "startTime", startTime);
// Do NOT invoke the taosGetFqdn here.
// this function may be invoked when memory exception occurs,so we should assume that it is running in a memory locked
// environment. The lock operation by taosGetFqdn may cause this program deadlock.
tjsonAddStringToObject(pJson, "fqdn", tsLocalFqdn);
tjsonAddIntegerToObject(pJson, "pid", taosGetPId());
taosGetAppName(tmp, NULL);
tjsonAddStringToObject(pJson, "appName", tmp);
if (taosGetOsReleaseName(tmp, sizeof(tmp)) == 0) {
tjsonAddStringToObject(pJson, "os", tmp);
}
float numOfCores = 0;
if (taosGetCpuInfo(tmp, sizeof(tmp), &numOfCores) == 0) {
tjsonAddStringToObject(pJson, "cpuModel", tmp);
tjsonAddDoubleToObject(pJson, "numOfCpu", numOfCores);
} else {
tjsonAddDoubleToObject(pJson, "numOfCpu", tsNumOfCores);
}
snprintf(tmp, sizeof(tmp), "%" PRId64 " kB", tsTotalMemoryKB);
tjsonAddStringToObject(pJson, "memory", tmp);
tjsonAddStringToObject(pJson, "version", version);
tjsonAddStringToObject(pJson, "buildInfo", buildinfo);
tjsonAddStringToObject(pJson, "gitInfo", gitinfo);
tjsonAddIntegerToObject(pJson, "crashSig", signum);
tjsonAddIntegerToObject(pJson, "crashTs", taosGetTimestampUs());
#ifdef _TD_DARWIN_64
taosLogTraceToBuf(tmp, sizeof(tmp), 4);
#elif !defined(WINDOWS)
taosLogTraceToBuf(tmp, sizeof(tmp), 3);
#else
taosLogTraceToBuf(tmp, sizeof(tmp), 8);
#endif
tjsonAddStringToObject(pJson, "stackInfo", tmp);
char* pCont = tjsonToString(pJson);
tjsonDelete(pJson);
*pMsg = pCont;
return TSDB_CODE_SUCCESS;
}

View File

@ -119,6 +119,7 @@ void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode, bool commitAndRemoveWal)
pVnode->pFetchQ->threadId);
while (!taosQueueEmpty(pVnode->pFetchQ)) taosMsleep(10);
tqNotifyClose(pVnode->pImpl->pTq);
dInfo("vgId:%d, wait for vnode stream queue:%p is empty", pVnode->vgId, pVnode->pStreamQ);
while (!taosQueueEmpty(pVnode->pStreamQ)) taosMsleep(10);
@ -141,7 +142,6 @@ void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode, bool commitAndRemoveWal)
dInfo("vgId:%d, vnode is closed", pVnode->vgId);
if (commitAndRemoveWal) {
char path[TSDB_FILENAME_LEN] = {0};
snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d%swal", TD_DIRSEP, pVnode->vgId, TD_DIRSEP);
dInfo("vgId:%d, remove all wals, path:%s", pVnode->vgId, path);
tfsRmdir(pMgmt->pTfs, path);

View File

@ -1437,7 +1437,7 @@ int32_t mndValidateDbInfo(SMnode *pMnode, SDbVgVersion *pDbs, int32_t numOfDbs,
return 0;
}
static int32_t mndTrimDb(SMnode *pMnode, SDbObj *pDb, SRpcMsg *pReq) {
static int32_t mndTrimDb(SMnode *pMnode, SDbObj *pDb) {
SSdb *pSdb = pMnode->pSdb;
SVgObj *pVgroup = NULL;
void *pIter = NULL;
@ -1459,7 +1459,7 @@ static int32_t mndTrimDb(SMnode *pMnode, SDbObj *pDb, SRpcMsg *pReq) {
pHead->vgId = htonl(pVgroup->vgId);
tSerializeSVTrimDbReq((char *)pHead + sizeof(SMsgHead), contLen, &trimReq);
SRpcMsg rpcMsg = {.msgType = TDMT_VND_TRIM, .pCont = pHead, .contLen = contLen, .info = pReq->info};
SRpcMsg rpcMsg = {.msgType = TDMT_VND_TRIM, .pCont = pHead, .contLen = contLen};
SEpSet epSet = mndGetVgroupEpset(pMnode, pVgroup);
int32_t code = tmsgSendReq(&epSet, &rpcMsg);
if (code != 0) {
@ -1495,7 +1495,7 @@ static int32_t mndProcessTrimDbReq(SRpcMsg *pReq) {
goto _OVER;
}
code = mndTrimDb(pMnode, pDb, pReq);
code = mndTrimDb(pMnode, pDb);
_OVER:
if (code != 0) {

View File

@ -180,16 +180,9 @@ int32_t tqStreamTasksScanWal(STQ* pTq);
// tq util
char* createStreamTaskIdStr(int64_t streamId, int32_t taskId);
void createStreamTaskOffsetKey(char* dst, uint64_t streamId, uint32_t taskId);
int32_t tqAddInputBlockNLaunchTask(SStreamTask* pTask, SStreamQueueItem* pQueueItem, int64_t ver);
int32_t launchTaskForWalBlock(SStreamTask* pTask, SFetchRet* pRet, STqOffset* pOffset);
int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg);
void doSaveTaskOffset(STqOffsetStore* pOffsetStore, const char* pKey, int64_t ver);
void saveOffsetForAllTasks(STQ* pTq, int64_t ver);
void initOffsetForAllRestoreTasks(STQ* pTq);
int32_t transferToWalReadTask(SStreamMeta* pStreamMeta, SArray* pTaskList);
#ifdef __cplusplus
}
#endif

View File

@ -190,6 +190,7 @@ int32_t tsdbSetKeepCfg(STsdb* pTsdb, STsdbCfg* pCfg);
int tqInit();
void tqCleanUp();
STQ* tqOpen(const char* path, SVnode* pVnode);
void tqNotifyClose(STQ*);
void tqClose(STQ*);
int tqPushMsg(STQ*, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver);
int tqRegisterPushHandle(STQ* pTq, void* pHandle, const SMqPollReq* pRequest, SRpcMsg* pRpcMsg, SMqDataRsp* pDataRsp,

View File

@ -154,6 +154,31 @@ void tqClose(STQ* pTq) {
taosMemoryFree(pTq);
}
void tqNotifyClose(STQ* pTq) {
if (pTq != NULL) {
taosWLockLatch(&pTq->pStreamMeta->lock);
void* pIter = NULL;
while (1) {
pIter = taosHashIterate(pTq->pStreamMeta->pTasks, pIter);
if (pIter == NULL) {
break;
}
SStreamTask* pTask = *(SStreamTask**)pIter;
tqDebug("vgId:%d s-task:%s set dropping flag", pTq->pStreamMeta->vgId, pTask->id.idStr);
pTask->status.taskStatus = TASK_STATUS__STOP;
int64_t st = taosGetTimestampMs();
qKillTask(pTask->exec.pExecutor, TSDB_CODE_SUCCESS);
int64_t el = taosGetTimestampMs() - st;
tqDebug("vgId:%d s-task:%s is closed in %" PRId64 "ms", pTq->pStreamMeta->vgId, pTask->id.idStr, el);
}
taosWUnLockLatch(&pTq->pStreamMeta->lock);
}
}
static int32_t doSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp* pRsp, int32_t epoch,
int64_t consumerId, int32_t type) {
int32_t len = 0;
@ -573,6 +598,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
pTask->pMsgCb = &pTq->pVnode->msgCb;
pTask->pMeta = pTq->pStreamMeta;
pTask->chkInfo.version = ver;
pTask->chkInfo.currentVer = ver;
// expand executor
if (pTask->fillHistory) {
@ -966,14 +992,21 @@ int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) {
int32_t* pRef = taosMemoryMalloc(sizeof(int32_t));
*pRef = 1;
taosWLockLatch(&pTq->pStreamMeta->lock);
void* pIter = NULL;
while (1) {
pIter = taosHashIterate(pTq->pStreamMeta->pTasks, pIter);
if (pIter == NULL) break;
SStreamTask* pTask = *(SStreamTask**)pIter;
if (pTask->taskLevel != TASK_LEVEL__SOURCE) continue;
if (pIter == NULL) {
break;
}
qDebug("delete req enqueue stream task: %d, ver: %" PRId64, pTask->id.taskId, ver);
SStreamTask* pTask = *(SStreamTask**)pIter;
if (pTask->taskLevel != TASK_LEVEL__SOURCE) {
continue;
}
qDebug("s-task:%s delete req enqueue, ver: %" PRId64, pTask->id.idStr, ver);
if (!failed) {
SStreamRefDataBlock* pRefBlock = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0);
@ -983,15 +1016,13 @@ int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) {
atomic_add_fetch_32(pRefBlock->dataRef, 1);
if (tAppendDataToInputQueue(pTask, (SStreamQueueItem*)pRefBlock) < 0) {
qError("stream task input del failed, task id %d", pTask->id.taskId);
atomic_sub_fetch_32(pRef, 1);
taosFreeQitem(pRefBlock);
continue;
}
if (streamSchedExec(pTask) < 0) {
qError("stream task launch failed, task id %d", pTask->id.taskId);
qError("s-task:%s stream task launch failed", pTask->id.idStr);
continue;
}
@ -1000,8 +1031,9 @@ int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) {
}
}
taosWUnLockLatch(&pTq->pStreamMeta->lock);
int32_t ref = atomic_sub_fetch_32(pRef, 1);
/*A(ref >= 0);*/
if (ref == 0) {
blockDataDestroy(pDelBlock);
taosMemoryFree(pRef);
@ -1032,23 +1064,9 @@ int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) {
}
blockDataDestroy(pDelBlock);
#endif
return 0;
}
static int32_t addSubmitBlockNLaunchTask(STqOffsetStore* pOffsetStore, SStreamTask* pTask, SStreamDataSubmit2* pSubmit,
const char* key, int64_t ver) {
doSaveTaskOffset(pOffsetStore, key, ver);
int32_t code = tqAddInputBlockNLaunchTask(pTask, (SStreamQueueItem*)pSubmit, ver);
// remove the offset, if all functions are completed successfully.
if (code == TSDB_CODE_SUCCESS) {
tqOffsetDelete(pOffsetStore, key);
}
return code;
}
int32_t tqProcessSubmitReq(STQ* pTq, SPackedData submit) {
#if 0
void* pIter = NULL;
@ -1309,9 +1327,7 @@ int32_t tqStartStreamTasks(STQ* pTq) {
return -1;
}
tqInfo("vgId:%d start wal scan stream tasks, tasks:%d", vgId, numOfTasks);
initOffsetForAllRestoreTasks(pTq);
tqDebug("vgId:%d start wal scan stream tasks, tasks:%d", vgId, numOfTasks);
pRunReq->head.vgId = vgId;
pRunReq->streamId = 0;
pRunReq->taskId = WAL_READ_TASKS_ID;

View File

@ -1023,6 +1023,7 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) {
}
// update the table list handle for each stream scanner/wal reader
taosWLockLatch(&pTq->pStreamMeta->lock);
while (1) {
pIter = taosHashIterate(pTq->pStreamMeta->pTasks, pIter);
if (pIter == NULL) {
@ -1039,5 +1040,7 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) {
}
}
taosWUnLockLatch(&pTq->pStreamMeta->lock);
return 0;
}

View File

@ -15,121 +15,125 @@
#include "tq.h"
static int32_t streamTaskReplayWal(SStreamMeta* pStreamMeta, STqOffsetStore* pOffsetStore, bool* pScanIdle);
static int32_t transferToNormalTask(SStreamMeta* pStreamMeta, SArray* pTaskList);
static int32_t createStreamRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle);
// this function should be executed by stream threads.
// there is a case that the WAL increases more fast than the restore procedure, and this restore procedure
// will not stop eventually.
int tqStreamTasksScanWal(STQ* pTq) {
int32_t tqStreamTasksScanWal(STQ* pTq) {
int32_t vgId = TD_VID(pTq->pVnode);
SStreamMeta* pMeta = pTq->pStreamMeta;
int64_t st = taosGetTimestampMs();
while (1) {
tqInfo("vgId:%d continue check if data in wal are available", vgId);
int32_t scan = pMeta->walScan;
tqDebug("vgId:%d continue check if data in wal are available, scan:%d", vgId, scan);
// check all restore tasks
bool allFull = true;
streamTaskReplayWal(pTq->pStreamMeta, pTq->pOffsetStore, &allFull);
bool shouldIdle = true;
createStreamRunReq(pTq->pStreamMeta, &shouldIdle);
int32_t times = 0;
if (allFull) {
if (shouldIdle) {
taosWLockLatch(&pMeta->lock);
pMeta->walScan -= 1;
times = pMeta->walScan;
ASSERT(pMeta->walScan >= 0);
if (pMeta->walScan <= 0) {
taosWUnLockLatch(&pMeta->lock);
break;
}
taosWUnLockLatch(&pMeta->lock);
tqInfo("vgId:%d scan wal for stream tasks for %d times", vgId, times);
tqDebug("vgId:%d scan wal for stream tasks for %d times", vgId, times);
}
}
double el = (taosGetTimestampMs() - st) / 1000.0;
tqInfo("vgId:%d scan wal for stream tasks completed, elapsed time:%.2f sec", vgId, el);
int64_t el = (taosGetTimestampMs() - st);
tqDebug("vgId:%d scan wal for stream tasks completed, elapsed time:%"PRId64" ms", vgId, el);
return 0;
}
//int32_t transferToNormalTask(SStreamMeta* pStreamMeta, SArray* pTaskList) {
// int32_t numOfTask = taosArrayGetSize(pTaskList);
// if (numOfTask <= 0) {
// return TSDB_CODE_SUCCESS;
// }
//
// // todo: add lock
// for (int32_t i = 0; i < numOfTask; ++i) {
// SStreamTask* pTask = taosArrayGetP(pTaskList, i);
// tqDebug("vgId:%d transfer s-task:%s state restore -> ready, checkpoint:%" PRId64 " checkpoint id:%" PRId64,
// pStreamMeta->vgId, pTask->id.idStr, pTask->chkInfo.version, pTask->chkInfo.id);
// taosHashRemove(pStreamMeta->pWalReadTasks, &pTask->id.taskId, sizeof(pTask->id.taskId));
//
// // NOTE: do not change the following order
// atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__NORMAL);
// taosHashPut(pStreamMeta->pTasks, &pTask->id.taskId, sizeof(pTask->id.taskId), &pTask, POINTER_BYTES);
// }
//
// return TSDB_CODE_SUCCESS;
//}
int32_t streamTaskReplayWal(SStreamMeta* pStreamMeta, STqOffsetStore* pOffsetStore, bool* pScanIdle) {
static SArray* extractTaskIdList(SStreamMeta* pStreamMeta, int32_t numOfTasks) {
SArray* pTaskIdList = taosArrayInit(numOfTasks, sizeof(int32_t));
void* pIter = NULL;
int32_t vgId = pStreamMeta->vgId;
*pScanIdle = true;
bool allWalChecked = true;
tqDebug("vgId:%d start to check wal to extract new submit block", vgId);
while (1) {
taosWLockLatch(&pStreamMeta->lock);
while(1) {
pIter = taosHashIterate(pStreamMeta->pTasks, pIter);
if (pIter == NULL) {
break;
}
SStreamTask* pTask = *(SStreamTask**)pIter;
taosArrayPush(pTaskIdList, &pTask->id.taskId);
}
taosWUnLockLatch(&pStreamMeta->lock);
return pTaskIdList;
}
int32_t createStreamRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) {
*pScanIdle = true;
bool noNewDataInWal = true;
int32_t vgId = pStreamMeta->vgId;
int32_t numOfTasks = taosHashGetSize(pStreamMeta->pTasks);
if (numOfTasks == 0) {
return TSDB_CODE_SUCCESS;
}
tqDebug("vgId:%d start to check wal to extract new submit block for %d tasks", vgId, numOfTasks);
SArray* pTaskIdList = extractTaskIdList(pStreamMeta, numOfTasks);
// update the new task number
numOfTasks = taosArrayGetSize(pTaskIdList);
for (int32_t i = 0; i < numOfTasks; ++i) {
int32_t* pTaskId = taosArrayGet(pTaskIdList, i);
SStreamTask* pTask = streamMetaAcquireTask(pStreamMeta, *pTaskId);
if (pTask == NULL) {
continue;
}
int32_t status = pTask->status.taskStatus;
if (pTask->taskLevel != TASK_LEVEL__SOURCE) {
tqDebug("s-task:%s not source task, no need to start", pTask->id.idStr);
streamMetaReleaseTask(pStreamMeta, pTask);
continue;
}
int8_t status = pTask->status.taskStatus;
if (status == TASK_STATUS__RECOVER_PREPARE || status == TASK_STATUS__WAIT_DOWNSTREAM) {
tqDebug("s-task:%s skip push data, not ready for processing, status %d", pTask->id.idStr,
pTask->status.taskStatus);
if (streamTaskShouldStop(&pTask->status) || status == TASK_STATUS__RECOVER_PREPARE ||
status == TASK_STATUS__WAIT_DOWNSTREAM) {
tqDebug("s-task:%s skip push data, not ready for processing, status %d", pTask->id.idStr, status);
streamMetaReleaseTask(pStreamMeta, pTask);
continue;
}
// check if offset value exists
char key[128] = {0};
createStreamTaskOffsetKey(key, pTask->id.streamId, pTask->id.taskId);
if (tInputQueueIsFull(pTask)) {
tqDebug("vgId:%d s-task:%s input queue is full, do nothing", vgId, pTask->id.idStr);
streamMetaReleaseTask(pStreamMeta, pTask);
continue;
}
*pScanIdle = false;
// check if offset value exists
STqOffset* pOffset = tqOffsetRead(pOffsetStore, key);
ASSERT(pOffset != NULL);
// seek the stored version and extract data from WAL
int32_t code = walReadSeekVer(pTask->exec.pWalReader, pOffset->val.version);
int32_t code = walReadSeekVer(pTask->exec.pWalReader, pTask->chkInfo.currentVer);
if (code != TSDB_CODE_SUCCESS) { // no data in wal, quit
streamMetaReleaseTask(pStreamMeta, pTask);
continue;
}
// append the data for the stream
tqDebug("vgId:%d wal reader seek to ver:%" PRId64 " %s", vgId, pOffset->val.version, pTask->id.idStr);
tqDebug("vgId:%d s-task:%s wal reader seek to ver:%" PRId64, vgId, pTask->id.idStr, pTask->chkInfo.currentVer);
SPackedData packData = {0};
code = extractSubmitMsgFromWal(pTask->exec.pWalReader, &packData);
if (code != TSDB_CODE_SUCCESS) { // failed, continue
streamMetaReleaseTask(pStreamMeta, pTask);
continue;
}
@ -137,28 +141,32 @@ int32_t streamTaskReplayWal(SStreamMeta* pStreamMeta, STqOffsetStore* pOffsetSto
if (p == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
tqError("%s failed to create data submit for stream since out of memory", pTask->id.idStr);
streamMetaReleaseTask(pStreamMeta, pTask);
continue;
}
allWalChecked = false;
noNewDataInWal = false;
tqDebug("s-task:%s submit data extracted from WAL", pTask->id.idStr);
code = tqAddInputBlockNLaunchTask(pTask, (SStreamQueueItem*)p, packData.ver);
if (code == TSDB_CODE_SUCCESS) {
pOffset->val.version = walReaderGetCurrentVer(pTask->exec.pWalReader);
pTask->chkInfo.currentVer = walReaderGetCurrentVer(pTask->exec.pWalReader);
tqDebug("s-task:%s set the ver:%" PRId64 " from WALReader after extract block from WAL", pTask->id.idStr,
pOffset->val.version);
pTask->chkInfo.currentVer);
} else {
// do nothing
tqError("s-task:%s append input queue failed, ver:%"PRId64, pTask->id.idStr, pTask->chkInfo.currentVer);
}
streamDataSubmitDestroy(p);
taosFreeQitem(p);
streamMetaReleaseTask(pStreamMeta, pTask);
}
if (allWalChecked) {
// all wal are checked, and no new data available in wal.
if (noNewDataInWal) {
*pScanIdle = true;
}
taosArrayDestroy(pTaskIdList);
return 0;
}

View File

@ -25,21 +25,6 @@ char* createStreamTaskIdStr(int64_t streamId, int32_t taskId) {
return taosStrdup(buf);
}
// stream_task:stream_id:task_id
void createStreamTaskOffsetKey(char* dst, uint64_t streamId, uint32_t taskId) {
int32_t n = 12;
char* p = dst;
memcpy(p, "stream_task:", n);
p += n;
int32_t inc = tintToHex(streamId, p);
p += inc;
*(p++) = ':';
tintToHex(taskId, p);
}
int32_t tqAddInputBlockNLaunchTask(SStreamTask* pTask, SStreamQueueItem* pQueueItem, int64_t ver) {
int32_t code = tAppendDataToInputQueue(pTask, pQueueItem);
if (code < 0) {
@ -55,75 +40,6 @@ int32_t tqAddInputBlockNLaunchTask(SStreamTask* pTask, SStreamQueueItem* pQueueI
return TSDB_CODE_SUCCESS;
}
void initOffsetForAllRestoreTasks(STQ* pTq) {
void* pIter = NULL;
while(1) {
pIter = taosHashIterate(pTq->pStreamMeta->pTasks, pIter);
if (pIter == NULL) {
break;
}
SStreamTask* pTask = *(SStreamTask**)pIter;
if (pTask->taskLevel != TASK_LEVEL__SOURCE) {
continue;
}
if (pTask->status.taskStatus == TASK_STATUS__RECOVER_PREPARE || pTask->status.taskStatus == TASK_STATUS__WAIT_DOWNSTREAM) {
tqDebug("s-task:%s skip push data, since not ready, status %d", pTask->id.idStr, pTask->status.taskStatus);
continue;
}
char key[128] = {0};
createStreamTaskOffsetKey(key, pTask->id.streamId, pTask->id.taskId);
STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, key);
if (pOffset == NULL) {
doSaveTaskOffset(pTq->pOffsetStore, key, pTask->chkInfo.version);
}
}
}
void saveOffsetForAllTasks(STQ* pTq, int64_t ver) {
void* pIter = NULL;
while(1) {
pIter = taosHashIterate(pTq->pStreamMeta->pTasks, pIter);
if (pIter == NULL) {
break;
}
SStreamTask* pTask = *(SStreamTask**)pIter;
if (pTask->taskLevel != TASK_LEVEL__SOURCE) {
continue;
}
if (pTask->status.taskStatus == TASK_STATUS__RECOVER_PREPARE || pTask->status.taskStatus == TASK_STATUS__WAIT_DOWNSTREAM) {
tqDebug("s-task:%s skip push data, not ready for processing, status %d", pTask->id.idStr,
pTask->status.taskStatus);
continue;
}
char key[128] = {0};
createStreamTaskOffsetKey(key, pTask->id.streamId, pTask->id.taskId);
STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, key);
if (pOffset == NULL) {
doSaveTaskOffset(pTq->pOffsetStore, key, ver);
}
}
}
void doSaveTaskOffset(STqOffsetStore* pOffsetStore, const char* pKey, int64_t ver) {
STqOffset offset = {0};
tqOffsetResetToLog(&offset.val, ver);
tstrncpy(offset.subKey, pKey, tListLen(offset.subKey));
// keep the offset info in the offset store
tqOffsetWrite(pOffsetStore, &offset);
}
static int32_t tqInitDataRsp(SMqDataRsp* pRsp, const SMqPollReq* pReq, int8_t subType) {
pRsp->reqOffset = pReq->reqOffset;

View File

@ -1108,6 +1108,11 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
pScanBaseInfo->dataReader = NULL;
// let's seek to the next version in wal file
int64_t firstVer = walGetFirstVer(pInfo->tqReader->pWalReader->pWal);
if (pOffset->version + 1 < firstVer){
pOffset->version = firstVer - 1;
}
if (tqSeekVer(pInfo->tqReader, pOffset->version + 1, id) < 0) {
qError("tqSeekVer failed ver:%" PRId64 ", %s", pOffset->version + 1, id);
return -1;

View File

@ -2541,6 +2541,20 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
}
while (1) {
if (isTaskKilled(pTaskInfo)) {
if (pInfo->pUpdated != NULL) {
pInfo->pUpdated = taosArrayDestroy(pInfo->pUpdated);
}
if (pInfo->pUpdatedMap != NULL) {
tSimpleHashCleanup(pInfo->pUpdatedMap);
pInfo->pUpdatedMap = NULL;
}
T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
}
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
if (pBlock == NULL) {
pOperator->status = OP_RES_TO_RETURN;
@ -2635,6 +2649,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
while ((pIte = tSimpleHashIterate(pInfo->pUpdatedMap, pIte, &iter)) != NULL) {
taosArrayPush(pInfo->pUpdated, pIte);
}
tSimpleHashCleanup(pInfo->pUpdatedMap);
pInfo->pUpdatedMap = NULL;
taosArraySort(pInfo->pUpdated, winKeyCmprImpl);

View File

@ -79,6 +79,26 @@ ENDIF ()
target_link_libraries(
udf1 PUBLIC os ${LINK_JEMALLOC})
add_library(udf1_dup STATIC MODULE test/udf1_dup.c)
target_include_directories(
udf1_dup
PUBLIC
"${TD_SOURCE_DIR}/include/libs/function"
"${TD_SOURCE_DIR}/include/util"
"${TD_SOURCE_DIR}/include/common"
"${TD_SOURCE_DIR}/include/client"
"${TD_SOURCE_DIR}/include/os"
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
)
IF (TD_LINUX_64 AND JEMALLOC_ENABLED)
ADD_DEPENDENCIES(udf1_dup jemalloc)
ENDIF ()
target_link_libraries(
udf1_dup PUBLIC os ${LINK_JEMALLOC})
add_library(udf2 STATIC MODULE test/udf2.c)
target_include_directories(
udf2
@ -99,6 +119,26 @@ target_link_libraries(
udf2 PUBLIC os ${LINK_JEMALLOC}
)
add_library(udf2_dup STATIC MODULE test/udf2_dup.c)
target_include_directories(
udf2_dup
PUBLIC
"${TD_SOURCE_DIR}/include/libs/function"
"${TD_SOURCE_DIR}/include/util"
"${TD_SOURCE_DIR}/include/common"
"${TD_SOURCE_DIR}/include/client"
"${TD_SOURCE_DIR}/include/os"
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
)
IF (TD_LINUX_64 AND JEMALLOC_ENABLED)
ADD_DEPENDENCIES(udf2_dup jemalloc)
ENDIF ()
target_link_libraries(
udf2_dup PUBLIC os ${LINK_JEMALLOC}
)
#SET(EXECUTABLE_OUTPUT_PATH ${CMAKE_BINARY_DIR}/build/bin)
add_executable(udfd src/udfd.c)
target_include_directories(

View File

@ -0,0 +1,42 @@
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#ifdef LINUX
#include <unistd.h>
#endif
#ifdef WINDOWS
#include <windows.h>
#endif
#include "taosudf.h"
DLL_EXPORT int32_t udf1_dup_init() { return 0; }
DLL_EXPORT int32_t udf1_dup_destroy() { return 0; }
DLL_EXPORT int32_t udf1_dup(SUdfDataBlock *block, SUdfColumn *resultCol) {
SUdfColumnData *resultData = &resultCol->colData;
for (int32_t i = 0; i < block->numOfRows; ++i) {
int j = 0;
for (; j < block->numOfCols; ++j) {
if (udfColDataIsNull(block->udfCols[j], i)) {
udfColDataSetNull(resultCol, i);
break;
}
}
if (j == block->numOfCols) {
int32_t luckyNum = 2;
udfColDataSet(resultCol, i, (char *)&luckyNum, false);
}
}
// to simulate actual processing delay by udf
#ifdef LINUX
usleep(1 * 1000); // usleep takes sleep time in us (1 millionth of a second)
#endif
#ifdef WINDOWS
Sleep(1);
#endif
resultData->numOfRows = block->numOfRows;
return 0;
}

View File

@ -0,0 +1,78 @@
#include <math.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "taosudf.h"
DLL_EXPORT int32_t udf2_dup_init() { return 0; }
DLL_EXPORT int32_t udf2_dup_destroy() { return 0; }
DLL_EXPORT int32_t udf2_dup_start(SUdfInterBuf* buf) {
*(int64_t*)(buf->buf) = 0;
buf->bufLen = sizeof(double);
buf->numOfResult = 1;
return 0;
}
DLL_EXPORT int32_t udf2_dup(SUdfDataBlock* block, SUdfInterBuf* interBuf, SUdfInterBuf* newInterBuf) {
double sumSquares = 0;
if (interBuf->numOfResult == 1) {
sumSquares = *(double*)interBuf->buf;
}
int8_t numNotNull = 0;
for (int32_t i = 0; i < block->numOfCols; ++i) {
SUdfColumn* col = block->udfCols[i];
if (!(col->colMeta.type == TSDB_DATA_TYPE_INT || col->colMeta.type == TSDB_DATA_TYPE_DOUBLE)) {
return TSDB_CODE_UDF_INVALID_INPUT;
}
}
for (int32_t i = 0; i < block->numOfCols; ++i) {
for (int32_t j = 0; j < block->numOfRows; ++j) {
SUdfColumn* col = block->udfCols[i];
if (udfColDataIsNull(col, j)) {
continue;
}
switch (col->colMeta.type) {
case TSDB_DATA_TYPE_INT: {
char* cell = udfColDataGetData(col, j);
int32_t num = *(int32_t*)cell;
sumSquares += (double)num * num;
break;
}
case TSDB_DATA_TYPE_DOUBLE: {
char* cell = udfColDataGetData(col, j);
double num = *(double*)cell;
sumSquares += num * num;
break;
}
default:
break;
}
++numNotNull;
}
}
*(double*)(newInterBuf->buf) = sumSquares;
newInterBuf->bufLen = sizeof(double);
if (interBuf->numOfResult == 0 && numNotNull == 0) {
newInterBuf->numOfResult = 0;
} else {
newInterBuf->numOfResult = 1;
}
return 0;
}
DLL_EXPORT int32_t udf2_dup_finish(SUdfInterBuf* buf, SUdfInterBuf* resultData) {
if (buf->numOfResult == 0) {
resultData->numOfResult = 0;
return 0;
}
double sumSquares = *(double*)(buf->buf);
*(double*)(resultData->buf) = sqrt(sumSquares) + 100;
resultData->bufLen = sizeof(double);
resultData->numOfResult = 1;
return 0;
}

View File

@ -251,7 +251,7 @@ int32_t qBindStmtColsValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBuf, in
goto _return;
}
if (bind[c].buffer_type != pColSchema->type) {
if ((!(rowNum == 1 && bind[c].is_null && *bind[c].is_null)) && bind[c].buffer_type != pColSchema->type) { // for rowNum ==1 , connector may not set buffer_type
code = buildInvalidOperationMsg(&pBuf, "column type mis-match with buffer type");
goto _return;
}

View File

@ -692,7 +692,7 @@ SToken tStrGetToken(const char* str, int32_t* i, bool isPrevOptr, bool* pIgnoreC
len = tGetToken(&str[*i + t0.n + 1], &type);
// only id and string are valid
if ((TK_NK_STRING != t0.type) && (TK_NK_ID != t0.type)) {
if (((TK_NK_STRING != t0.type) && (TK_NK_ID != t0.type)) || ((TK_NK_STRING != type) && (TK_NK_ID != type))) {
t0.type = TK_NK_ILLEGAL;
t0.n = 0;

View File

@ -2526,7 +2526,7 @@ static bool tbCntScanOptIsEligibleAggFuncs(SNodeList* pAggFuncs) {
return false;
}
}
return true;
return LIST_LENGTH(pAggFuncs) > 0;
}
static bool tbCntScanOptIsEligibleAgg(SAggLogicNode* pAgg) {

View File

@ -16,7 +16,7 @@
#include "streamInc.h"
#include "ttimer.h"
#define STREAM_TASK_INPUT_QUEUEU_CAPACITY 100000
#define STREAM_TASK_INPUT_QUEUEU_CAPACITY 3000
int32_t streamInit() {
int8_t old;

View File

@ -17,6 +17,11 @@
#define STREAM_EXEC_MAX_BATCH_NUM 100
bool streamTaskShouldStop(const SStreamStatus* pStatus) {
int32_t status = atomic_load_8((int8_t*) &pStatus->taskStatus);
return (status == TASK_STATUS__STOP) || (status == TASK_STATUS__DROPPING);
}
static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray* pRes) {
int32_t code = TSDB_CODE_SUCCESS;
void* pExecutor = pTask->exec.pExecutor;
@ -66,7 +71,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray*
// pExecutor
while (1) {
if (pTask->status.taskStatus == TASK_STATUS__DROPPING) {
if (streamTaskShouldStop(&pTask->status)) {
return 0;
}
@ -106,7 +111,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray*
continue;
}
qDebug("task %d(child %d) executed and get block", pTask->id.taskId, pTask->selfChildId);
qDebug("s-task:%s (child %d) executed and get block", pTask->id.idStr, pTask->selfChildId);
SSDataBlock block = {0};
assignOneDataBlock(&block, output);
@ -134,7 +139,7 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) {
int32_t batchCnt = 0;
while (1) {
if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING) {
if (streamTaskShouldStop(&pTask->status)) {
taosArrayDestroy(pRes);
return 0;
}
@ -270,7 +275,7 @@ int32_t streamExecForAll(SStreamTask* pTask) {
}
}
if (pTask->status.taskStatus == TASK_STATUS__DROPPING) {
if (streamTaskShouldStop(&pTask->status)) {
if (pInput) {
streamFreeQitem(pInput);
}
@ -301,7 +306,7 @@ int32_t streamExecForAll(SStreamTask* pTask) {
", checkPoint id:%" PRId64 " -> %" PRId64,
pTask->id.idStr, pTask->chkInfo.version, dataVer, pTask->chkInfo.id, ckId);
pTask->chkInfo = (SCheckpointInfo) {.version = dataVer, .id = ckId};
pTask->chkInfo = (SCheckpointInfo) {.version = dataVer, .id = ckId, .currentVer = pTask->chkInfo.currentVer};
taosWLockLatch(&pTask->pMeta->lock);
streamMetaSaveTask(pTask->pMeta, pTask);
@ -368,7 +373,7 @@ int32_t streamTryExec(SStreamTask* pTask) {
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
qDebug("s-task:%s exec completed", pTask->id.idStr);
if (!taosQueueEmpty(pTask->inputQueue->queue)) {
if (!taosQueueEmpty(pTask->inputQueue->queue) && (!streamTaskShouldStop(&pTask->status))) {
streamSchedExec(pTask);
}
}

View File

@ -84,11 +84,6 @@ void streamMetaClose(SStreamMeta* pMeta) {
tdbClose(pMeta->db);
void* pIter = NULL;
// while(pMeta->walScan) {
// qDebug("wait stream daemon quit");
// taosMsleep(100);
// }
while (1) {
pIter = taosHashIterate(pMeta->pTasks, pIter);
if (pIter == NULL) {
@ -102,7 +97,6 @@ void streamMetaClose(SStreamMeta* pMeta) {
}
tFreeStreamTask(pTask);
/*streamMetaReleaseTask(pMeta, pTask);*/
}
taosHashCleanup(pMeta->pTasks);
@ -197,10 +191,12 @@ SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int32_t taskId) {
taosRLockLatch(&pMeta->lock);
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t));
if (ppTask != NULL && (atomic_load_8(&((*ppTask)->status.taskStatus)) != TASK_STATUS__DROPPING)) {
atomic_add_fetch_32(&(*ppTask)->refCnt, 1);
taosRUnLockLatch(&pMeta->lock);
return *ppTask;
if (ppTask != NULL) {
if (!streamTaskShouldStop(&(*ppTask)->status)) {
atomic_add_fetch_32(&(*ppTask)->refCnt, 1);
taosRUnLockLatch(&pMeta->lock);
return *ppTask;
}
}
taosRUnLockLatch(&pMeta->lock);
@ -211,7 +207,7 @@ void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask) {
int32_t left = atomic_sub_fetch_32(&pTask->refCnt, 1);
ASSERT(left >= 0);
if (left == 0) {
ASSERT(atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING);
ASSERT(streamTaskShouldStop(&pTask->status));
tFreeStreamTask(pTask);
}
}
@ -222,11 +218,8 @@ void streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId) {
SStreamTask* pTask = *ppTask;
taosHashRemove(pMeta->pTasks, &taskId, sizeof(int32_t));
tdbTbDelete(pMeta->pTaskDb, &taskId, sizeof(int32_t), pMeta->txn);
/*if (pTask->timer) {
* taosTmrStop(pTask->timer);*/
/*pTask->timer = NULL;*/
/*}*/
atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__DROPPING);
atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__STOP);
taosWLockLatch(&pMeta->lock);
streamMetaReleaseTask(pMeta, pTask);

View File

@ -1008,7 +1008,7 @@ int32_t taosGetFqdn(char *fqdn) {
// hints.ai_family = AF_INET;
strcpy(fqdn, hostname);
strcpy(fqdn + strlen(hostname), ".local");
#else // __APPLE__
#else // linux
struct addrinfo hints = {0};
struct addrinfo *result = NULL;
hints.ai_flags = AI_CANONNAME;
@ -1020,7 +1020,7 @@ int32_t taosGetFqdn(char *fqdn) {
}
strcpy(fqdn, result->ai_canonname);
freeaddrinfo(result);
#endif // __APPLE__
#endif // linux
return 0;
}

View File

@ -17,7 +17,6 @@
#include "tlog.h"
#include "os.h"
#include "tconfig.h"
#include "tutil.h"
#include "tjson.h"
#include "tglobal.h"
@ -781,65 +780,6 @@ bool taosAssertDebug(bool condition, const char *file, int32_t line, const char
return true;
}
int32_t taosGenCrashJsonMsg(int signum, char** pMsg, int64_t clusterId, int64_t startTime) {
SJson* pJson = tjsonCreateObject();
if (pJson == NULL) return -1;
char tmp[4096] = {0};
tjsonAddDoubleToObject(pJson, "reportVersion", 1);
tjsonAddIntegerToObject(pJson, "clusterId", clusterId);
tjsonAddIntegerToObject(pJson, "startTime", startTime);
taosGetFqdn(tmp);
tjsonAddStringToObject(pJson, "fqdn", tmp);
tjsonAddIntegerToObject(pJson, "pid", taosGetPId());
taosGetAppName(tmp, NULL);
tjsonAddStringToObject(pJson, "appName", tmp);
if (taosGetOsReleaseName(tmp, sizeof(tmp)) == 0) {
tjsonAddStringToObject(pJson, "os", tmp);
}
float numOfCores = 0;
if (taosGetCpuInfo(tmp, sizeof(tmp), &numOfCores) == 0) {
tjsonAddStringToObject(pJson, "cpuModel", tmp);
tjsonAddDoubleToObject(pJson, "numOfCpu", numOfCores);
} else {
tjsonAddDoubleToObject(pJson, "numOfCpu", tsNumOfCores);
}
snprintf(tmp, sizeof(tmp), "%" PRId64 " kB", tsTotalMemoryKB);
tjsonAddStringToObject(pJson, "memory", tmp);
tjsonAddStringToObject(pJson, "version", version);
tjsonAddStringToObject(pJson, "buildInfo", buildinfo);
tjsonAddStringToObject(pJson, "gitInfo", gitinfo);
tjsonAddIntegerToObject(pJson, "crashSig", signum);
tjsonAddIntegerToObject(pJson, "crashTs", taosGetTimestampUs());
#ifdef _TD_DARWIN_64
taosLogTraceToBuf(tmp, sizeof(tmp), 4);
#elif !defined(WINDOWS)
taosLogTraceToBuf(tmp, sizeof(tmp), 3);
#else
taosLogTraceToBuf(tmp, sizeof(tmp), 8);
#endif
tjsonAddStringToObject(pJson, "stackInfo", tmp);
char* pCont = tjsonToString(pJson);
tjsonDelete(pJson);
*pMsg = pCont;
return TSDB_CODE_SUCCESS;
}
void taosLogCrashInfo(char* nodeType, char* pMsg, int64_t msgLen, int signum, void *sigInfo) {
const char *flags = "UTL FATAL ";
ELogLevel level = DEBUG_FATAL;

View File

@ -68,8 +68,8 @@ docker run \
-v ${REP_REAL_PATH}/community/contrib/libuv/:${REP_DIR}/community/contrib/libuv \
-v ${REP_REAL_PATH}/community/contrib/lz4/:${REP_DIR}/community/contrib/lz4 \
-v ${REP_REAL_PATH}/community/contrib/zlib/:${REP_DIR}/community/contrib/zlib \
-v ${REP_REAL_PATH}/community/contrib/jemalloc/:${REP_DIR}/community/contrib/jemalloc \
--rm --ulimit core=-1 taos_test:v1.0 sh -c "pip uninstall taospy -y;pip3 install taospy==2.7.2;cd $REP_DIR;rm -rf debug;mkdir -p debug;cd debug;cmake .. -DBUILD_HTTP=false -DBUILD_TOOLS=true -DBUILD_TEST=true -DWEBSOCKET=true -DBUILD_TAOSX=true -DJEMALLOC_ENABLED=true;make -j || exit 1"
# -v ${REP_REAL_PATH}/community/contrib/jemalloc/:${REP_DIR}/community/contrib/jemalloc \
if [[ -d ${WORKDIR}/debugNoSan ]] ;then
echo "delete ${WORKDIR}/debugNoSan"

View File

@ -104,4 +104,9 @@ if $data62 != 5 then
return -1
endi
sql select distinct db_name from information_schema.ins_tables;
print $rows
if $rows != 4 then
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT

View File

@ -37,7 +37,7 @@ if $loop_count == 20 then
endi
if $rows != 4 then
print =====rows=$rows, expect 4
print =====rows=$rows expect 4
goto loop0
endi

View File

@ -0,0 +1,71 @@
drop database if exists d0;
create database d0 replica 1 keep 365 minRows 100 maxRows 4096 comp 2 vgroups 2 precision 'ms';
use d0;
create table if not exists almlog (starttime timestamp,endtime timestamp,durationtime int, alarmno int, alarmtext nchar(256),isactive nchar(64)) tags (mcid nchar(16));
create table if not exists mplog (starttime timestamp,mpid int, paravalue nchar(256),mptype nchar(32)) tags (mcid nchar(16));
create table if not exists mdlog (starttime timestamp,endtime timestamp,durationtime int, statuscode int, npcgmname nchar(256),attr int) tags (mcid nchar(16));
create table if not exists nrglog (updatetime timestamp,energyvalue double,enerygyincrease double) tags (mcid nchar(16),enerygytype nchar(16));
create table almlog_m201 using almlog tags("m201");
create table almlog_m0103 using almlog tags("m0103");
create table almlog_m0103_20031 using almlog tags("m0103");
create table almlog_m0103_20032 using almlog tags("m0103");
create table almlog_m0103_3003 using almlog tags("m0103");
create table almlog_m0103_20033 using almlog tags("m0103");
create table almlog_m0103_30031 using almlog tags("m0103");
create table almlog_m0201 using almlog tags("m0201");
create table almlog_m0102 using almlog tags("m0102");
create table almlog_m0101 using almlog tags("m0101");
create table almlog_m1002 using almlog tags("m1002");
create table mplog_m0204_4 using mplog tags("m0204");
create table mplog_m0204_5 using mplog tags("m0204");
create table mplog_m0204_6 using mplog tags("m0204");
create table mplog_m0204_12 using mplog tags("m0204");
create table mplog_m0204 using mplog tags("m0204");
create table mplog_m201 using mplog tags("m201");
create table mplog_m0102 using mplog tags("m0102");
create table mplog_m1101 using mplog tags("m1101");
create table mdlog_m0102 using mplog tags("m0102");
create table mdlog_m0504 using mplog tags("m0504");
create table mdlog_m0505 using mplog tags("m0505");
create table mdlog_m0507 using mplog tags("m0507");
create table mdlog_m1002 using mplog tags("m1002");
create table mdlog_m3201 using mplog tags("m3201");
create table mdlog_m0201 using mplog tags("m0201");
create table mdlog_m1102 using mplog tags("m1102");
create table mdlog_m201 using mplog tags("m201");
create table nrglog_m201_electricvalue1 using nrglog tags("m201","electricValue1");
create table nrglog_m201_oilvalue1 using nrglog tags("m201","oilValue1");
create table nrglog_m201_gasvalue1 using nrglog tags("m201","gasValue1");
create table nrglog_m201_watervalue1 using nrglog tags("m201","waterValue1");
create table nrglog_m0101_oilvalue1 using nrglog tags("m0101","oilValue1");
create table nrglog_m0101_watervalue1 using nrglog tags("m0101","waterValue1");
create table nrglog_m0102_gasvalue1 using nrglog tags("m0102","gasValue1");
create table nrglog_m1903 using nrglog tags("m1903",NULL);
create table nrglog_m2802 using nrglog tags("m2802",NULL);
create table nrglog_m2101 using nrglog tags("m2101",NULL);
create table nrglog_m0102 using nrglog tags("m0102",NULL);
create table nrglog_m0101_electricvalue1 using nrglog tags("m0101","electricValue1");
create table nrglog_m0101_gasvalue1 using nrglog tags("m0101","gasValue1");
create table nrglog_m0102_electricvalue1 using nrglog tags("m0102","electricValue1");
create table nrglog_m0102_oilvalue1 using nrglog tags("m0102","oilValue1");
create table nrglog_m0102_watervalue1 using nrglog tags("m0102","waterValue1");
insert into almlog_m0103 values(now,now+1s,10,0,'','dismissed');
insert into almlog_m0103_20031 values(now,now+1s,10,20031,'','dismissed');
insert into almlog_m0103_20032 values(now,now+1s,10,20032,'','dismissed');
insert into almlog_m0103_3003 values(now,now+1s,10,3003,'','dismissed');
insert into almlog_m0103_20033 values(now,now+1s,10,20033,'','dismissed');
insert into almlog_m0103_30031 values(now,now+1s,10,30031,'','dismissed');
flush database d0;
show table tags from almlog;
select *,tbname from d0.almlog where mcid='m0103';
select table_name from information_schema.ins_tables where db_name='d0';

View File

@ -53,18 +53,8 @@
"sample_format": "csv",
"sample_file": "./sample.csv",
"tags_file": "",
"columns": [
{
"type": "INT",
"count": 4094
}
],
"tags": [
{
"type": "TINYINT",
"count": 1
}
]
"columns": [{ "type": "INT","count": 4093}],
"tags": [{"type": "TINYINT", "count": 1},{"type": "NCHAR","count": 1}]
}
]
}

View File

@ -98,9 +98,12 @@ class TDTestCase:
def buildTaosd(self,bPath):
# os.system(f"mv {bPath}/build_bak {bPath}/build ")
os.system(f" cd {bPath} ")
os.system(f" cd {bPath} ")
def is_list_same_as_ordered_list(self,unordered_list, ordered_list):
sorted_list = sorted(unordered_list)
return sorted_list == ordered_list
def run(self):
scriptsPath = os.path.dirname(os.path.realpath(__file__))
distro_id = distro.id()
@ -146,6 +149,8 @@ class TDTestCase:
tdLog.info(" LD_LIBRARY_PATH=/usr/lib taosBenchmark -f 0-others/compa4096.json -y ")
os.system("LD_LIBRARY_PATH=/usr/lib taosBenchmark -f 0-others/compa4096.json -y")
os.system("LD_LIBRARY_PATH=/usr/lib taos -s 'flush database db4096 '")
os.system("LD_LIBRARY_PATH=/usr/lib taos -f 0-others/TS-3131.tsql")
cmd = f" LD_LIBRARY_PATH={bPath}/build/lib {bPath}/build/bin/taos -h localhost ;"
if os.system(cmd) == 0:
raise Exception("failed to execute system command. cmd: %s" % cmd)
@ -220,6 +225,17 @@ class TDTestCase:
tdLog.exit("%s(%d) failed" % args)
tdsql.query("show streams;")
tdsql.checkRows(2)
tdsql.query("select *,tbname from d0.almlog where mcid='m0103';")
tdsql.checkRows(6)
expectList = [0,3003,20031,20032,20033,30031]
resultList = []
for i in range(6):
resultList.append(tdsql.queryResult[i][3])
print(resultList)
if self.is_list_same_as_ordered_list(resultList,expectList):
print("The unordered list is the same as the ordered list.")
else:
tdlog.error("The unordered list is not the same as the ordered list.")
tdsql.execute("insert into test.d80 values (now+1s, 11, 103, 0.21);")
tdsql.execute("insert into test.d9 values (now+5s, 4.3, 104, 0.4);")

View File

@ -47,17 +47,27 @@ class TDTestCase:
if platform.system().lower() == 'windows':
self.libudf1 = subprocess.Popen('(for /r %s %%i in ("udf1.d*") do @echo %%i)|grep lib|head -n1'%projPath , shell=True, stdout=subprocess.PIPE,stderr=subprocess.STDOUT).stdout.read().decode("utf-8")
self.libudf1_dup = subprocess.Popen('(for /r %s %%i in ("udf1_dup.d*") do @echo %%i)|grep lib|head -n1'%projPath , shell=True, stdout=subprocess.PIPE,stderr=subprocess.STDOUT).stdout.read().decode("utf-8")
self.libudf2 = subprocess.Popen('(for /r %s %%i in ("udf2.d*") do @echo %%i)|grep lib|head -n1'%projPath , shell=True, stdout=subprocess.PIPE,stderr=subprocess.STDOUT).stdout.read().decode("utf-8")
self.libudf2_dup = subprocess.Popen('(for /r %s %%i in ("udf2_dup.d*") do @echo %%i)|grep lib|head -n1'%projPath , shell=True, stdout=subprocess.PIPE,stderr=subprocess.STDOUT).stdout.read().decode("utf-8")
if (not tdDnodes.dnodes[0].remoteIP == ""):
tdDnodes.dnodes[0].remote_conn.get(tdDnodes.dnodes[0].config["path"]+'/debug/build/lib/libudf1.so',projPath+"\\debug\\build\\lib\\")
tdDnodes.dnodes[0].remote_conn.get(tdDnodes.dnodes[0].config["path"]+'/debug/build/lib/libudf1_dup.so',projPath+"\\debug\\build\\lib\\")
tdDnodes.dnodes[0].remote_conn.get(tdDnodes.dnodes[0].config["path"]+'/debug/build/lib/libudf2.so',projPath+"\\debug\\build\\lib\\")
tdDnodes.dnodes[0].remote_conn.get(tdDnodes.dnodes[0].config["path"]+'/debug/build/lib/libudf2_dup.so',projPath+"\\debug\\build\\lib\\")
self.libudf1 = self.libudf1.replace('udf1.dll','libudf1.so')
self.libudf1_dup = self.libudf1_dup.replace('udf1_dup.dll','libudf1_dup.so')
self.libudf2 = self.libudf2.replace('udf2.dll','libudf2.so')
self.libudf2_dup = self.libudf2_dup.replace('udf2_dup.dll','libudf2_dup.so')
else:
self.libudf1 = subprocess.Popen('find %s -name "libudf1.so"|grep lib|head -n1'%projPath , shell=True, stdout=subprocess.PIPE,stderr=subprocess.STDOUT).stdout.read().decode("utf-8")
self.libudf1_dup = subprocess.Popen('find %s -name "libudf1_dup.so"|grep lib|head -n1'%projPath , shell=True, stdout=subprocess.PIPE,stderr=subprocess.STDOUT).stdout.read().decode("utf-8")
self.libudf2 = subprocess.Popen('find %s -name "libudf2.so"|grep lib|head -n1'%projPath , shell=True, stdout=subprocess.PIPE,stderr=subprocess.STDOUT).stdout.read().decode("utf-8")
self.libudf2_dup = subprocess.Popen('find %s -name "libudf2_dup.so"|grep lib|head -n1'%projPath , shell=True, stdout=subprocess.PIPE,stderr=subprocess.STDOUT).stdout.read().decode("utf-8")
self.libudf1 = self.libudf1.replace('\r','').replace('\n','')
self.libudf1_dup = self.libudf1_dup.replace('\r','').replace('\n','')
self.libudf2 = self.libudf2.replace('\r','').replace('\n','')
self.libudf2_dup = self.libudf2_dup.replace('\r','').replace('\n','')
def prepare_data(self):
@ -174,10 +184,12 @@ class TDTestCase:
# create scalar functions
tdSql.execute("create function udf1 as '%s' outputtype int;"%self.libudf1)
tdSql.execute("create function udf1_dup as '%s' outputtype int;"%self.libudf1_dup)
# create aggregate functions
tdSql.execute("create aggregate function udf2 as '%s' outputtype double bufSize 8;"%self.libudf2)
tdSql.execute("create aggregate function udf2_dup as '%s' outputtype double bufSize 8;"%self.libudf2_dup)
functions = tdSql.getResult("show functions")
function_nums = len(functions)
@ -188,6 +200,13 @@ class TDTestCase:
# scalar functions
# udf1_dup
tdSql.query("select udf1(num1) ,udf1_dup(num1) from tb")
tdSql.checkData(1,0,1)
tdSql.checkData(1,1,2)
tdSql.checkData(2,0,1)
tdSql.checkData(2,1,2)
tdSql.execute("use db ")
tdSql.query("select num1 , udf1(num1) ,num2 ,udf1(num2),num3 ,udf1(num3),num4 ,udf1(num4) from tb")
tdSql.checkData(0,0,None)
@ -238,6 +257,10 @@ class TDTestCase:
# aggregate functions
tdSql.query("select udf2(num1) ,udf2_dup(num1) from tb")
val = tdSql.queryResult[0][0] + 100
tdSql.checkData(0,1,val)
tdSql.query("select udf2(num1) ,udf2(num2), udf2(num3) from tb")
tdSql.checkData(0,0,15.362291496)
tdSql.checkData(0,1,10000949.553189287)

View File

@ -29,6 +29,9 @@ class TDTestCase:
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor())
self.dbname = 'db_test'
self.ns_dbname = 'ns_test'
self.us_dbname = 'us_test'
self.ms_dbname = 'ms_test'
self.setsql = TDSetSql()
self.stbname = 'stb'
self.ntbname = 'ntb'
@ -220,11 +223,45 @@ class TDTestCase:
tdSql.query(f'select {func}(*) from {self.stbname}')
tdSql.execute(f'drop table {self.stbname}')
tdSql.execute(f'drop database {self.dbname}')
def precision_now_check(self):
for dbname in [self.ms_dbname, self.us_dbname, self.ns_dbname]:
self.ts = 1537146000000
if dbname == self.us_dbname:
self.ts = int(self.ts*1000)
precision = "us"
elif dbname == self.ns_dbname:
precision = "ns"
self.ts = int(self.ts*1000000)
else:
precision = "ms"
self.ts = int(self.ts)
tdSql.execute(f'drop database if exists {dbname}')
tdSql.execute(f'create database if not exists {dbname} precision "{precision}"')
tdSql.execute(f'use {dbname}')
self.base_data = {
'tinyint':self.tinyint_val
}
self.column_dict = {
'col1': 'tinyint'
}
for col_name,col_type in self.column_dict.items():
tdSql.execute(f'create table if not exists {self.stbname} (ts timestamp,{col_name} {col_type}) tags(t1 int)')
for i in range(self.tbnum):
tdSql.execute(f'create table if not exists {self.stbname}_{i} using {self.stbname} tags(1)')
self.insert_base_data(col_type,f'{self.stbname}_{i}',self.rowNum,self.base_data)
tdSql.query(f'select * from {self.stbname}')
tdSql.checkEqual(tdSql.queryRows, self.tbnum*self.rowNum)
tdSql.execute(f'delete from {self.stbname} where ts < now()')
tdSql.query(f'select * from {self.stbname}')
tdSql.checkEqual(tdSql.queryRows, 0)
def run(self):
self.delete_data_stb()
tdDnodes.stoptaosd(1)
tdDnodes.starttaosd(1)
self.delete_data_stb()
self.precision_now_check()
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)

View File

@ -82,7 +82,7 @@ class TDTestCase:
tdSql.query("select * from %s.notifyinfo"%cdbName)
#tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3))
if tdSql.getRows() == 2 :
print(tdSql.getData(0, 1), tdSql.getData(1, 1))
tdLog.info("row[0][1]: %d, row[1][1]: %d"%(tdSql.getData(0, 1), tdSql.getData(1, 1)))
if tdSql.getData(1, 1) == 1:
break
time.sleep(0.1)
@ -122,6 +122,7 @@ class TDTestCase:
os.system(shellCmd)
def create_tables(self,tsql, dbName,vgroups,stbName,ctbNum,rowsPerTbl):
tdLog.info("start create tables......")
tsql.execute("create database if not exists %s vgroups %d wal_retention_period 3600"%(dbName, vgroups))
tsql.execute("use %s" %dbName)
tsql.execute("create table if not exists %s (ts timestamp, c1 bigint, c2 binary(16)) tags(t1 int)"%stbName)
@ -137,11 +138,11 @@ class TDTestCase:
tsql.execute(sql)
event.set()
tdLog.debug("complete to create database[%s], stable[%s] and %d child tables" %(dbName, stbName, ctbNum))
tdLog.info("complete to create database[%s], stable[%s] and %d child tables" %(dbName, stbName, ctbNum))
return
def insert_data(self,tsql,dbName,stbName,ctbNum,rowsPerTbl,batchNum,startTs):
tdLog.debug("start to insert data ............")
tdLog.info("start to insert data ............")
tsql.execute("use %s" %dbName)
pre_insert = "insert into "
sql = pre_insert
@ -163,7 +164,7 @@ class TDTestCase:
if sql != pre_insert:
#print("insert sql:%s"%sql)
tsql.execute(sql)
tdLog.debug("insert data ............ [OK]")
tdLog.info("insert data ............ [OK]")
return
def prepareEnv(self, **parameterDict):
@ -286,7 +287,7 @@ class TDTestCase:
prepareEnvThread.start()
tdLog.info("create topics from db")
topicName1 = 'topic_db1'
topicName1 = 'topic_db11'
tdSql.execute("create topic %s as database %s" %(topicName1, parameterDict['dbName']))
consumerId = 0

View File

@ -42,7 +42,7 @@ class TDTestCase:
'showRow': 1}
topicNameList = ['topic1', 'topic2']
expectRowsList = []
queryRowsList = []
tmqCom.initConsumerTable()
tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=4,replica=1)
tdSql.execute("alter database %s wal_retention_period 3600" % (paraDict['dbName']))
@ -60,7 +60,7 @@ class TDTestCase:
tdLog.info("create topic sql: %s"%sqlString)
tdSql.execute(sqlString)
tdSql.query(queryString)
expectRowsList.append(tdSql.getRows())
queryRowsList.append(tdSql.getRows())
# create one stb2
paraDict["stbName"] = 'stb2'
@ -77,7 +77,7 @@ class TDTestCase:
tdLog.info("create topic sql: %s"%sqlString)
tdSql.execute(sqlString)
# tdSql.query(queryString)
# expectRowsList.append(tdSql.getRows())
# queryRowsList.append(tdSql.getRows())
# init consume info, and start tmq_sim, then check consume result
tdLog.info("insert consume info to consume processor")
@ -99,7 +99,8 @@ class TDTestCase:
pThread = tmqCom.asyncInsertData(paraDict)
tdLog.info("wait consumer commit notify")
tmqCom.getStartCommitNotifyFromTmqsim(rows=4)
# tmqCom.getStartCommitNotifyFromTmqsim(rows=4)
tmqCom.getStartConsumeNotifyFromTmqsim(rows=2)
tdLog.info("pkill one consume processor")
tmqCom.stopTmqSimProcess('tmq_sim_new')
@ -109,19 +110,21 @@ class TDTestCase:
tdLog.info("wait the consume result")
expectRows = 2
resultList = tmqCom.selectConsumeResult(expectRows)
actTotalRows = 0
actConsumTotalRows = 0
for i in range(len(resultList)):
actTotalRows += resultList[i]
actConsumTotalRows += resultList[i]
tdLog.info("act consumer1 rows: %d, consumer2 rows: %d"%(resultList[0], resultList[1]))
tdSql.query(queryString)
expectRowsList.append(tdSql.getRows())
expectTotalRows = 0
for i in range(len(expectRowsList)):
expectTotalRows += expectRowsList[i]
queryRowsList.append(tdSql.getRows())
queryTotalRows = 0
for i in range(len(queryRowsList)):
queryTotalRows += queryRowsList[i]
tdLog.info("act consume rows: %d, expect consume rows: %d"%(actTotalRows, expectTotalRows))
if expectTotalRows <= resultList[0]:
tdLog.info("act consume rows: %d should >= expect consume rows: %d"%(actTotalRows, expectTotalRows))
tdLog.info("act consume rows: %d, query consume rows: %d"%(actConsumTotalRows, queryTotalRows))
if actConsumTotalRows < queryTotalRows:
tdLog.info("act consume rows: %d should >= query consume rows: %d"%(actConsumTotalRows, queryTotalRows))
tdLog.exit("0 tmq consume rows error!")
# time.sleep(10)
@ -130,9 +133,95 @@ class TDTestCase:
tdLog.printNoPrefix("======== test case 1 end ...... ")
def tmqCase2(self):
tdLog.printNoPrefix("======== test case 2: ")
paraDict = {'dbName': 'db1',
'dropFlag': 1,
'event': '',
'vgroups': 4,
'stbName': 'stb',
'colPrefix': 'c',
'tagPrefix': 't',
'colSchema': [{'type': 'INT', 'count':2}, {'type': 'binary', 'len':20, 'count':1},{'type': 'TIMESTAMP', 'count':1}],
'tagSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}],
'ctbPrefix': 'ctb',
'ctbNum': 10,
'rowsPerTbl': 1000,
'batchNum': 10,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 20,
'showMsg': 1,
'showRow': 1}
topicNameList = ['topic3', 'topic4']
queryRowsList = []
tmqCom.initConsumerTable()
tdLog.info("create topics from stb with filter")
# queryString = "select ts, log(c1), ceil(pow(c1,3)) from %s.%s where c1 %% 7 == 0" %(paraDict['dbName'], paraDict['stbName'])
queryString = "select ts, log(c1), ceil(pow(c1,3)) from %s.%s" %(paraDict['dbName'], paraDict['stbName'])
sqlString = "create topic %s as %s" %(topicNameList[0], queryString)
tdLog.info("create topic sql: %s"%sqlString)
tdSql.execute(sqlString)
tdSql.query(queryString)
queryRowsList.append(tdSql.getRows())
# create one stb2
paraDict["stbName"] = 'stb2'
# queryString = "select ts, sin(c1), abs(pow(c1,3)) from %s.%s where c1 %% 7 == 0" %(paraDict['dbName'], paraDict['stbName'])
queryString = "select ts, sin(c1), abs(pow(c1,3)) from %s.%s" %(paraDict['dbName'], paraDict['stbName'])
sqlString = "create topic %s as %s" %(topicNameList[1], queryString)
tdLog.info("create topic sql: %s"%sqlString)
tdSql.execute(sqlString)
tdSql.query(queryString)
queryRowsList.append(tdSql.getRows())
# init consume info, and start tmq_sim, then check consume result
tdLog.info("insert consume info to consume processor")
consumerId = 0
paraDict["rowsPerTbl"] = 5000
expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"] * 2
topicList = "%s,%s"%(topicNameList[0],topicNameList[1])
ifcheckdata = 1
ifManualCommit = 1
keyList = 'group.id:cgrp1, enable.auto.commit:true, auto.commit.interval.ms:3000, auto.offset.reset:earliest'
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
tdLog.info("start consume processor 1")
tmqCom.startTmqSimProcess(paraDict['pollDelay'],paraDict["dbName"],paraDict['showMsg'], paraDict['showRow'])
tdLog.info("start consume processor 2")
tmqCom.startTmqSimProcess(paraDict['pollDelay'],paraDict["dbName"],paraDict['showMsg'], paraDict['showRow'],'cdb',0,1)
tdLog.info("wait the consume result")
expectRows = 2
resultList = tmqCom.selectConsumeResult(expectRows)
actConsumTotalRows = 0
for i in range(len(resultList)):
actConsumTotalRows += resultList[i]
tdLog.info("act consumer1 rows: %d, consumer2 rows: %d"%(resultList[0], resultList[1]))
queryTotalRows = 0
for i in range(len(queryRowsList)):
queryTotalRows += queryRowsList[i]
tdLog.info("act consume rows: %d, query consume rows: %d"%(actConsumTotalRows, queryTotalRows))
if actConsumTotalRows < queryTotalRows:
tdLog.info("act consume rows: %d should >= query consume rows: %d"%(actConsumTotalRows, queryTotalRows))
tdLog.exit("0 tmq consume rows error!")
# time.sleep(10)
# for i in range(len(topicNameList)):
# tdSql.query("drop topic %s"%topicNameList[i])
tdLog.printNoPrefix("======== test case 2 end ...... ")
def run(self):
tdSql.prepare()
self.tmqCase1()
self.tmqCase2()
def stop(self):
tdSql.close()

View File

@ -58,7 +58,6 @@ int32_t shellRunSingleCommand(char *command) {
}
if (shellRegexMatch(command, "^[ \t]*(quit|q|exit)[ \t;]*$", REG_EXTENDED | REG_ICASE)) {
shellWriteHistory();
return -1;
}
@ -887,7 +886,6 @@ void shellWriteHistory() {
}
i = (i + 1) % SHELL_MAX_HISTORY_SIZE;
}
taosFsyncFile(pFile);
taosCloseFile(&pFile);
}

View File

@ -83,6 +83,9 @@ int main(int argc, char *argv[]) {
#endif
taos_init();
// kill heart-beat thread when quit
taos_set_hb_quit(1);
if (shell.args.is_dump_config) {
shellDumpConfig();
taos_cleanup();