Merge remote-tracking branch 'origin/feature/3.0_liaohj' into feature/qnode2

This commit is contained in:
dapan1121 2022-01-17 18:20:55 +08:00
commit 16800098b4
9 changed files with 87 additions and 27 deletions

View File

@ -106,7 +106,13 @@ void scheduleFreeJob(void *pJob);
void schedulerDestroy(void);
int32_t schedulerGenerateTaskList(SQueryDag* pDag, SArray **pTasks);
/**
* convert dag to task list
* @param pDag
* @param pTasks SArray**<STaskInfo>
* @return
*/
int32_t schedulerConvertDagToTaskList(SQueryDag* pDag, SArray **pTasks);
void schedulerFreeTaskList(SArray *taskList);

View File

@ -49,7 +49,7 @@ int main(int argc, char** argv) {
TEST(testCase, driverInit_Test) { taos_init(); }
TEST(testCase, connect_Test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
TAOS* pConn = taos_connect("localhost", "root", "taosdata", "abc1", 0);
if (pConn == NULL) {
printf("failed to connect to server, reason:%s\n", taos_errstr(NULL));
}

View File

@ -47,6 +47,11 @@ enum {
CTG_RENT_STABLE,
};
typedef struct SCTGDebug {
int32_t lockDebug;
} SCTGDebug;
typedef struct SVgroupListCache {
int32_t vgroupVersion;
SHashObj *cache; // key:vgId, value:SVgroupInfo
@ -134,20 +139,22 @@ typedef uint32_t (*tableNameHashFp)(const char *, uint32_t);
#define CTG_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0)
#define CTG_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { terrno = code; goto _return; } } while (0)
#define CTG_LOCK_DEBUG(...) do { if (gCTGDebug.lockDebug) { qDebug(__VA_ARGS__); } } while (0)
#define TD_RWLATCH_WRITE_FLAG_COPY 0x40000000
#define CTG_LOCK(type, _lock) do { \
if (CTG_READ == (type)) { \
assert(atomic_load_32((_lock)) >= 0); \
qDebug("CTG RLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
CTG_LOCK_DEBUG("CTG RLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
taosRLockLatch(_lock); \
qDebug("CTG RLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
CTG_LOCK_DEBUG("CTG RLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
assert(atomic_load_32((_lock)) > 0); \
} else { \
assert(atomic_load_32((_lock)) >= 0); \
qDebug("CTG WLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
CTG_LOCK_DEBUG("CTG WLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
taosWLockLatch(_lock); \
qDebug("CTG WLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
CTG_LOCK_DEBUG("CTG WLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
assert(atomic_load_32((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY); \
} \
} while (0)
@ -155,15 +162,15 @@ typedef uint32_t (*tableNameHashFp)(const char *, uint32_t);
#define CTG_UNLOCK(type, _lock) do { \
if (CTG_READ == (type)) { \
assert(atomic_load_32((_lock)) > 0); \
qDebug("CTG RULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
CTG_LOCK_DEBUG("CTG RULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
taosRUnLockLatch(_lock); \
qDebug("CTG RULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
CTG_LOCK_DEBUG("CTG RULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
assert(atomic_load_32((_lock)) >= 0); \
} else { \
assert(atomic_load_32((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY); \
qDebug("CTG WULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
CTG_LOCK_DEBUG("CTG WULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
taosWUnLockLatch(_lock); \
qDebug("CTG WULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
CTG_LOCK_DEBUG("CTG WULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
assert(atomic_load_32((_lock)) >= 0); \
} \
} while (0)

View File

@ -20,6 +20,9 @@
SCatalogMgmt ctgMgmt = {0};
SCTGDebug gCTGDebug = {0};
int32_t ctgGetDBVgroupFromCache(struct SCatalog* pCatalog, const char *dbName, SDBVgroupInfo **dbInfo, bool *inCache) {
if (NULL == pCatalog->dbCache.cache) {
*inCache = false;

View File

@ -30,7 +30,8 @@ static void copyString(const cJSON* json, const char* name, char* dst) {
}
static int64_t getNumber(const cJSON* json, const char* name) {
return cJSON_GetNumberValue(cJSON_GetObjectItem(json, name));
double d = cJSON_GetNumberValue(cJSON_GetObjectItem(json, name));
return (int64_t) d;
}
static bool addObject(cJSON* json, const char* name, FToJson func, const void* obj) {
@ -541,17 +542,27 @@ static const char* jkTimeWindowEndKey = "EndKey";
static bool timeWindowToJson(const void* obj, cJSON* json) {
const STimeWindow* win = (const STimeWindow*)obj;
bool res = cJSON_AddNumberToObject(json, jkTimeWindowStartKey, win->skey);
char tmp[32] = {0};
sprintf(tmp, "%"PRId64, win->skey);
bool res = cJSON_AddStringToObject(json, jkTimeWindowStartKey, tmp);
if (res) {
res = cJSON_AddNumberToObject(json, jkTimeWindowEndKey, win->ekey);
memset(tmp, 0, tListLen(tmp));
sprintf(tmp, "%"PRId64, win->ekey);
res = cJSON_AddStringToObject(json, jkTimeWindowEndKey, tmp);
}
return res;
}
static bool timeWindowFromJson(const cJSON* json, void* obj) {
STimeWindow* win = (STimeWindow*)obj;
win->skey = getNumber(json, jkTimeWindowStartKey);
win->ekey = getNumber(json, jkTimeWindowEndKey);
char* p = getString(json, jkTimeWindowStartKey);
win->skey = strtoll(p, NULL, 10);
p = getString(json, jkTimeWindowEndKey);
win->ekey = strtoll(p, NULL, 10);
return true;
}

View File

@ -73,6 +73,10 @@ enum {
QW_ADD_ACQUIRE,
};
typedef struct SQWDebug {
int32_t lockDebug;
} SQWDebug;
typedef struct SQWMsg {
void *node;
char *msg;
@ -142,9 +146,6 @@ typedef struct SQWorkerMgmt {
#define QW_IN_EXECUTOR(ctx) ((ctx)->phase == QW_PHASE_PRE_QUERY || (ctx)->phase == QW_PHASE_PRE_CQUERY || (ctx)->phase == QW_PHASE_PRE_FETCH || (ctx)->phase == QW_PHASE_PRE_SINK)
#define QW_GOT_RES_DATA(data) (true)
#define QW_LOW_RES_DATA(data) (false)
#define QW_TASK_NOT_EXIST(code) (TSDB_CODE_QRY_SCH_NOT_EXIST == (code) || TSDB_CODE_QRY_TASK_NOT_EXIST == (code))
#define QW_TASK_ALREADY_EXIST(code) (TSDB_CODE_QRY_TASK_ALREADY_EXIST == (code))
#define QW_TASK_READY(status) (status == JOB_TASK_STATUS_SUCCEED || status == JOB_TASK_STATUS_FAILED || status == JOB_TASK_STATUS_CANCELLED || status == JOB_TASK_STATUS_PARTIAL_SUCCEED)
@ -169,21 +170,22 @@ typedef struct SQWorkerMgmt {
#define QW_SCH_TASK_WLOG(param, ...) qWarn("QW:%p SID:%"PRIx64",QID:%"PRIx64",TID:%"PRIx64" " param, mgmt, sId, qId, tId, __VA_ARGS__)
#define QW_SCH_TASK_DLOG(param, ...) qDebug("QW:%p SID:%"PRIx64",QID:%"PRIx64",TID:%"PRIx64" " param, mgmt, sId, qId, tId, __VA_ARGS__)
#define QW_LOCK_DEBUG(...) do { if (gQWDebug.lockDebug) { qDebug(__VA_ARGS__); } } while (0)
#define TD_RWLATCH_WRITE_FLAG_COPY 0x40000000
#define QW_LOCK(type, _lock) do { \
if (QW_READ == (type)) { \
assert(atomic_load_32((_lock)) >= 0); \
qDebug("QW RLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
QW_LOCK_DEBUG("QW RLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
taosRLockLatch(_lock); \
qDebug("QW RLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
QW_LOCK_DEBUG("QW RLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
assert(atomic_load_32((_lock)) > 0); \
} else { \
assert(atomic_load_32((_lock)) >= 0); \
qDebug("QW WLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
QW_LOCK_DEBUG("QW WLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
taosWLockLatch(_lock); \
qDebug("QW WLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
QW_LOCK_DEBUG("QW WLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
assert(atomic_load_32((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY); \
} \
} while (0)

View File

@ -9,6 +9,8 @@
#include "tname.h"
#include "dataSinkMgt.h"
SQWDebug gQWDebug = {0};
int32_t qwValidateStatus(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int8_t oriStatus, int8_t newStatus) {
int32_t code = 0;
@ -432,7 +434,7 @@ int32_t qwDropTask(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId,
QW_ERR_JRET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_DROPPING));
} else if (ctx->phase > 0) {
QW_ERR_JRET(qwDropTaskStatus(QW_FPARAMS()));
QW_ERR_JRET(qwDropTaskStatus(QW_FPARAMS()));
QW_ERR_JRET(qwDropTaskCtx(QW_FPARAMS()));
locked = false;
@ -938,6 +940,7 @@ int32_t qwProcessFetch(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t
QW_TASK_DLOG("sink need schedule, scheduleJobNo:%d", sOutput.scheduleJobNo);
ctx->sinkId = sOutput.scheduleJobNo;
QW_ERR_JRET(qwBuildAndSendSchSinkMsg(QW_FPARAMS(), qwMsg->connection));
}
} else if ((!sOutput.queryEnd) && (/* DS_BUF_LOW == sOutput.bufStatus || */ DS_BUF_EMPTY == sOutput.bufStatus)) {

View File

@ -324,7 +324,13 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
SQWMsg qwMsg = {.node = node, .msg = msg->msg, .msgLen = msg->contentLen, .connection = pMsg};
QW_SCH_TASK_DLOG("processQuery start");
QW_RET(qwProcessQuery(QW_FPARAMS(), &qwMsg));
QW_SCH_TASK_DLOG("processQuery end");
return TSDB_CODE_SUCCESS;
}
int32_t qWorkerProcessCQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
@ -351,7 +357,13 @@ int32_t qWorkerProcessCQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .connection = pMsg};
QW_RET(qwProcessCQuery(QW_FPARAMS(), &qwMsg));
QW_SCH_TASK_DLOG("processCQuery start");
QW_ERR_RET(qwProcessCQuery(QW_FPARAMS(), &qwMsg));
QW_SCH_TASK_DLOG("processCQuery end");
return TSDB_CODE_SUCCESS;
}
@ -394,7 +406,11 @@ int32_t qWorkerProcessReadyMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg){
SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .connection = pMsg};
QW_SCH_TASK_DLOG("processReady start");
QW_ERR_RET(qwProcessReady(qWorkerMgmt, msg->sId, msg->queryId, msg->taskId, &qwMsg));
QW_SCH_TASK_DLOG("processReady end");
return TSDB_CODE_SUCCESS;
}
@ -446,7 +462,13 @@ int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .connection = pMsg};
QW_RET(qwProcessFetch(QW_FPARAMS(), &qwMsg));
QW_SCH_TASK_DLOG("processFetch start");
QW_ERR_RET(qwProcessFetch(QW_FPARAMS(), &qwMsg));
QW_SCH_TASK_DLOG("processFetch end");
return TSDB_CODE_SUCCESS;
}
int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
@ -498,7 +520,13 @@ int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .connection = pMsg};
QW_RET(qwProcessDrop(QW_FPARAMS(), &qwMsg));
QW_SCH_TASK_DLOG("processDrop start");
QW_ERR_RET(qwProcessDrop(QW_FPARAMS(), &qwMsg));
QW_SCH_TASK_DLOG("processDrop end");
return TSDB_CODE_SUCCESS;
}
int32_t qWorkerProcessShowMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {

View File

@ -1374,7 +1374,7 @@ int32_t scheduleAsyncExecJob(void *transport, SArray *nodeList, SQueryDag* pDag,
return TSDB_CODE_SUCCESS;
}
int32_t schedulerGenerateTaskList(SQueryDag* pDag, SArray **pTasks) {
int32_t schedulerConvertDagToTaskList(SQueryDag* pDag, SArray **pTasks) {
if (NULL == pDag || pDag->numOfSubplans <= 0 || taosArrayGetSize(pDag->pSubplans) <= 0) {
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}