refactor(stream)
This commit is contained in:
parent
6e7e2b26f7
commit
a6015d2f39
|
@ -2574,6 +2574,14 @@ static FORCE_INLINE void tDeleteSMqAskEpRsp(SMqAskEpRsp* pRsp) {
|
|||
taosArrayDestroyEx(pRsp->topics, (void (*)(void*))tDeleteSMqSubTopicEp);
|
||||
}
|
||||
|
||||
typedef struct {
|
||||
void* data;
|
||||
} SStreamDispatchReq;
|
||||
|
||||
typedef struct {
|
||||
int8_t status;
|
||||
} SStreamDispatchRsp;
|
||||
|
||||
#define TD_AUTO_CREATE_TABLE 0x1
|
||||
typedef struct {
|
||||
int64_t suid;
|
||||
|
|
|
@ -13,6 +13,7 @@
|
|||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include "os.h"
|
||||
#include "tdatablock.h"
|
||||
#include "tmsg.h"
|
||||
#include "tmsgcb.h"
|
||||
|
@ -29,8 +30,22 @@ extern "C" {
|
|||
typedef struct SStreamTask SStreamTask;
|
||||
|
||||
enum {
|
||||
STREAM_TASK_STATUS__RUNNING = 1,
|
||||
STREAM_TASK_STATUS__STOP,
|
||||
TASK_STATUS__IDLE = 1,
|
||||
TASK_STATUS__EXECUTING,
|
||||
TASK_STATUS__CLOSING,
|
||||
};
|
||||
|
||||
enum {
|
||||
TASK_INPUT_STATUS__NORMAL = 1,
|
||||
TASK_INPUT_STATUS__BLOCKED,
|
||||
TASK_INPUT_STATUS__RECOVER,
|
||||
TASK_INPUT_STATUS__STOP,
|
||||
};
|
||||
|
||||
enum {
|
||||
TASK_OUTPUT_STATUS__NORMAL = 1,
|
||||
TASK_OUTPUT_STATUS__WAIT,
|
||||
TASK_OUTPUT_STATUS__BLOCKED,
|
||||
};
|
||||
|
||||
enum {
|
||||
|
@ -38,10 +53,64 @@ enum {
|
|||
STREAM_CREATED_BY__SMA,
|
||||
};
|
||||
|
||||
enum {
|
||||
STREAM_INPUT__DATA_SUBMIT = 1,
|
||||
STREAM_INPUT__DATA_BLOCK,
|
||||
STREAM_INPUT__CHECKPOINT,
|
||||
};
|
||||
|
||||
typedef struct {
|
||||
int32_t nodeId; // 0 for snode
|
||||
SEpSet epSet;
|
||||
} SStreamTaskEp;
|
||||
int8_t type;
|
||||
|
||||
int32_t sourceVg;
|
||||
int64_t sourceVer;
|
||||
|
||||
int32_t* dataRef;
|
||||
SSubmitReq* data;
|
||||
} SStreamDataSubmit;
|
||||
|
||||
typedef struct {
|
||||
int8_t type;
|
||||
|
||||
int32_t sourceVg;
|
||||
int64_t sourceVer;
|
||||
|
||||
SArray* blocks; // SArray<SSDataBlock*>
|
||||
} SStreamDataBlock;
|
||||
|
||||
typedef struct {
|
||||
int8_t type;
|
||||
} SStreamCheckpoint;
|
||||
|
||||
static FORCE_INLINE SStreamDataSubmit* streamDataSubmitNew(SSubmitReq* pReq) {
|
||||
SStreamDataSubmit* pDataSubmit = (SStreamDataSubmit*)taosMemoryCalloc(1, sizeof(SStreamDataSubmit));
|
||||
if (pDataSubmit == NULL) return NULL;
|
||||
pDataSubmit->data = pReq;
|
||||
pDataSubmit->dataRef = (int32_t*)taosMemoryMalloc(sizeof(int32_t));
|
||||
if (pDataSubmit->data == NULL) goto FAIL;
|
||||
*pDataSubmit->dataRef = 1;
|
||||
return pDataSubmit;
|
||||
FAIL:
|
||||
taosMemoryFree(pDataSubmit);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
static FORCE_INLINE void streamDataSubmitRefInc(SStreamDataSubmit* pDataSubmit) {
|
||||
//
|
||||
atomic_add_fetch_32(pDataSubmit->dataRef, 1);
|
||||
}
|
||||
|
||||
static FORCE_INLINE void streamDataSubmitRefDec(SStreamDataSubmit* pDataSubmit) {
|
||||
int32_t ref = atomic_sub_fetch_32(pDataSubmit->dataRef, 1);
|
||||
ASSERT(ref >= 0);
|
||||
if (ref == 0) {
|
||||
taosMemoryFree(pDataSubmit->data);
|
||||
taosMemoryFree(pDataSubmit->dataRef);
|
||||
}
|
||||
}
|
||||
|
||||
int32_t streamDataBlockEncode(void** buf, const SStreamDataBlock* pOutput);
|
||||
void* streamDataBlockDecode(const void* buf, SStreamDataBlock* pInput);
|
||||
|
||||
typedef struct {
|
||||
void* inputHandle;
|
||||
|
@ -122,9 +191,15 @@ enum {
|
|||
TASK_SINK__FETCH,
|
||||
};
|
||||
|
||||
enum {
|
||||
TASK_INPUT_TYPE__SUMBIT_BLOCK = 1,
|
||||
TASK_INPUT_TYPE__DATA_BLOCK,
|
||||
};
|
||||
|
||||
struct SStreamTask {
|
||||
int64_t streamId;
|
||||
int32_t taskId;
|
||||
int8_t inputType;
|
||||
int8_t status;
|
||||
|
||||
int8_t sourceType;
|
||||
|
@ -155,9 +230,11 @@ struct SStreamTask {
|
|||
STaskDispatcherShuffle shuffleDispatcher;
|
||||
};
|
||||
|
||||
// msg buffer
|
||||
int32_t memUsed;
|
||||
int8_t inputStatus;
|
||||
int8_t outputStatus;
|
||||
|
||||
STaosQueue* inputQ;
|
||||
STaosQueue* outputQ;
|
||||
|
||||
// application storage
|
||||
void* ahandle;
|
||||
|
@ -199,10 +276,16 @@ typedef struct {
|
|||
SArray* res; // SArray<SSDataBlock>
|
||||
} SStreamSinkReq;
|
||||
|
||||
int32_t streamEnqueueData(SStreamTask* pTask, const void* input, int32_t inputType);
|
||||
int32_t streamEnqueueDataSubmit(SStreamTask* pTask, SStreamDataSubmit* input);
|
||||
int32_t streamEnqueueDataBlk(SStreamTask* pTask, SStreamDataBlock* input);
|
||||
int32_t streamDequeueOutput(SStreamTask* pTask, void** output);
|
||||
|
||||
int32_t streamExecTask(SStreamTask* pTask, SMsgCb* pMsgCb, const void* input, int32_t inputType, int32_t workId);
|
||||
|
||||
int32_t streamTaskExecNew(SStreamTask* pTask);
|
||||
|
||||
int32_t streamTaskHandleInput(SStreamTask* pTask, void* data);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -23,92 +23,92 @@ extern "C" {
|
|||
// If the error is in a third-party library, place this header file under the third-party library header file.
|
||||
// When you want to use this feature, you should find or add the same function in the following section.
|
||||
#ifndef ALLOW_FORBID_FUNC
|
||||
#define __atomic_load_n __ATOMIC_LOAD_N_FUNC_TAOS_FORBID
|
||||
#define __atomic_store_n __ATOMIC_STORE_N_FUNC_TAOS_FORBID
|
||||
#define __atomic_exchange_n __ATOMIC_EXCHANGE_N_FUNC_TAOS_FORBID
|
||||
#define __sync_val_compare_and_swap __SYNC_VAL_COMPARE_AND_SWAP_FUNC_TAOS_FORBID
|
||||
#define __atomic_add_fetch __ATOMIC_ADD_FETCH_FUNC_TAOS_FORBID
|
||||
#define __atomic_fetch_add __ATOMIC_FETCH_ADD_FUNC_TAOS_FORBID
|
||||
#define __atomic_sub_fetch __ATOMIC_SUB_FETCH_FUNC_TAOS_FORBID
|
||||
#define __atomic_fetch_sub __ATOMIC_FETCH_SUB_FUNC_TAOS_FORBID
|
||||
#define __atomic_and_fetch __ATOMIC_AND_FETCH_FUNC_TAOS_FORBID
|
||||
#define __atomic_fetch_and __ATOMIC_FETCH_AND_FUNC_TAOS_FORBID
|
||||
#define __atomic_or_fetch __ATOMIC_OR_FETCH_FUNC_TAOS_FORBID
|
||||
#define __atomic_fetch_or __ATOMIC_FETCH_OR_FUNC_TAOS_FORBID
|
||||
#define __atomic_xor_fetch __ATOMIC_XOR_FETCH_FUNC_TAOS_FORBID
|
||||
#define __atomic_fetch_xor __ATOMIC_FETCH_XOR_FUNC_TAOS_FORBID
|
||||
#define __atomic_load_n __ATOMIC_LOAD_N_FUNC_TAOS_FORBID
|
||||
#define __atomic_store_n __ATOMIC_STORE_N_FUNC_TAOS_FORBID
|
||||
#define __atomic_exchange_n __ATOMIC_EXCHANGE_N_FUNC_TAOS_FORBID
|
||||
#define __sync_val_compare_and_swap __SYNC_VAL_COMPARE_AND_SWAP_FUNC_TAOS_FORBID
|
||||
#define __atomic_add_fetch __ATOMIC_ADD_FETCH_FUNC_TAOS_FORBID
|
||||
#define __atomic_fetch_add __ATOMIC_FETCH_ADD_FUNC_TAOS_FORBID
|
||||
#define __atomic_sub_fetch __ATOMIC_SUB_FETCH_FUNC_TAOS_FORBID
|
||||
#define __atomic_fetch_sub __ATOMIC_FETCH_SUB_FUNC_TAOS_FORBID
|
||||
#define __atomic_and_fetch __ATOMIC_AND_FETCH_FUNC_TAOS_FORBID
|
||||
#define __atomic_fetch_and __ATOMIC_FETCH_AND_FUNC_TAOS_FORBID
|
||||
#define __atomic_or_fetch __ATOMIC_OR_FETCH_FUNC_TAOS_FORBID
|
||||
#define __atomic_fetch_or __ATOMIC_FETCH_OR_FUNC_TAOS_FORBID
|
||||
#define __atomic_xor_fetch __ATOMIC_XOR_FETCH_FUNC_TAOS_FORBID
|
||||
#define __atomic_fetch_xor __ATOMIC_FETCH_XOR_FUNC_TAOS_FORBID
|
||||
#endif
|
||||
|
||||
int8_t atomic_load_8(int8_t volatile *ptr);
|
||||
int8_t atomic_load_8(int8_t volatile *ptr);
|
||||
int16_t atomic_load_16(int16_t volatile *ptr);
|
||||
int32_t atomic_load_32(int32_t volatile *ptr);
|
||||
int64_t atomic_load_64(int64_t volatile *ptr);
|
||||
void* atomic_load_ptr(void *ptr);
|
||||
void atomic_store_8(int8_t volatile *ptr, int8_t val);
|
||||
void atomic_store_16(int16_t volatile *ptr, int16_t val);
|
||||
void atomic_store_32(int32_t volatile *ptr, int32_t val);
|
||||
void atomic_store_64(int64_t volatile *ptr, int64_t val);
|
||||
void atomic_store_ptr(void *ptr, void *val);
|
||||
int8_t atomic_exchange_8(int8_t volatile *ptr, int8_t val);
|
||||
void *atomic_load_ptr(void *ptr);
|
||||
void atomic_store_8(int8_t volatile *ptr, int8_t val);
|
||||
void atomic_store_16(int16_t volatile *ptr, int16_t val);
|
||||
void atomic_store_32(int32_t volatile *ptr, int32_t val);
|
||||
void atomic_store_64(int64_t volatile *ptr, int64_t val);
|
||||
void atomic_store_ptr(void *ptr, void *val);
|
||||
int8_t atomic_exchange_8(int8_t volatile *ptr, int8_t val);
|
||||
int16_t atomic_exchange_16(int16_t volatile *ptr, int16_t val);
|
||||
int32_t atomic_exchange_32(int32_t volatile *ptr, int32_t val);
|
||||
int64_t atomic_exchange_64(int64_t volatile *ptr, int64_t val);
|
||||
void* atomic_exchange_ptr(void *ptr, void *val);
|
||||
int8_t atomic_val_compare_exchange_8(int8_t volatile *ptr, int8_t oldval, int8_t newval);
|
||||
void *atomic_exchange_ptr(void *ptr, void *val);
|
||||
int8_t atomic_val_compare_exchange_8(int8_t volatile *ptr, int8_t oldval, int8_t newval);
|
||||
int16_t atomic_val_compare_exchange_16(int16_t volatile *ptr, int16_t oldval, int16_t newval);
|
||||
int32_t atomic_val_compare_exchange_32(int32_t volatile *ptr, int32_t oldval, int32_t newval);
|
||||
int64_t atomic_val_compare_exchange_64(int64_t volatile *ptr, int64_t oldval, int64_t newval);
|
||||
void* atomic_val_compare_exchange_ptr(void *ptr, void *oldval, void *newval);
|
||||
int8_t atomic_add_fetch_8(int8_t volatile *ptr, int8_t val);
|
||||
void *atomic_val_compare_exchange_ptr(void *ptr, void *oldval, void *newval);
|
||||
int8_t atomic_add_fetch_8(int8_t volatile *ptr, int8_t val);
|
||||
int16_t atomic_add_fetch_16(int16_t volatile *ptr, int16_t val);
|
||||
int32_t atomic_add_fetch_32(int32_t volatile *ptr, int32_t val);
|
||||
int64_t atomic_add_fetch_64(int64_t volatile *ptr, int64_t val);
|
||||
void* atomic_add_fetch_ptr(void *ptr, void *val);
|
||||
int8_t atomic_fetch_add_8(int8_t volatile *ptr, int8_t val);
|
||||
void *atomic_add_fetch_ptr(void *ptr, void *val);
|
||||
int8_t atomic_fetch_add_8(int8_t volatile *ptr, int8_t val);
|
||||
int16_t atomic_fetch_add_16(int16_t volatile *ptr, int16_t val);
|
||||
int32_t atomic_fetch_add_32(int32_t volatile *ptr, int32_t val);
|
||||
int64_t atomic_fetch_add_64(int64_t volatile *ptr, int64_t val);
|
||||
void* atomic_fetch_add_ptr(void *ptr, void *val);
|
||||
int8_t atomic_sub_fetch_8(int8_t volatile *ptr, int8_t val);
|
||||
void *atomic_fetch_add_ptr(void *ptr, void *val);
|
||||
int8_t atomic_sub_fetch_8(int8_t volatile *ptr, int8_t val);
|
||||
int16_t atomic_sub_fetch_16(int16_t volatile *ptr, int16_t val);
|
||||
int32_t atomic_sub_fetch_32(int32_t volatile *ptr, int32_t val);
|
||||
int64_t atomic_sub_fetch_64(int64_t volatile *ptr, int64_t val);
|
||||
void* atomic_sub_fetch_ptr(void *ptr, void *val);
|
||||
int8_t atomic_fetch_sub_8(int8_t volatile *ptr, int8_t val);
|
||||
void *atomic_sub_fetch_ptr(void *ptr, void *val);
|
||||
int8_t atomic_fetch_sub_8(int8_t volatile *ptr, int8_t val);
|
||||
int16_t atomic_fetch_sub_16(int16_t volatile *ptr, int16_t val);
|
||||
int32_t atomic_fetch_sub_32(int32_t volatile *ptr, int32_t val);
|
||||
int64_t atomic_fetch_sub_64(int64_t volatile *ptr, int64_t val);
|
||||
void* atomic_fetch_sub_ptr(void *ptr, void *val);
|
||||
int8_t atomic_and_fetch_8(int8_t volatile *ptr, int8_t val);
|
||||
void *atomic_fetch_sub_ptr(void *ptr, void *val);
|
||||
int8_t atomic_and_fetch_8(int8_t volatile *ptr, int8_t val);
|
||||
int16_t atomic_and_fetch_16(int16_t volatile *ptr, int16_t val);
|
||||
int32_t atomic_and_fetch_32(int32_t volatile *ptr, int32_t val);
|
||||
int64_t atomic_and_fetch_64(int64_t volatile *ptr, int64_t val);
|
||||
void* atomic_and_fetch_ptr(void *ptr, void *val);
|
||||
int8_t atomic_fetch_and_8(int8_t volatile *ptr, int8_t val);
|
||||
void *atomic_and_fetch_ptr(void *ptr, void *val);
|
||||
int8_t atomic_fetch_and_8(int8_t volatile *ptr, int8_t val);
|
||||
int16_t atomic_fetch_and_16(int16_t volatile *ptr, int16_t val);
|
||||
int32_t atomic_fetch_and_32(int32_t volatile *ptr, int32_t val);
|
||||
int64_t atomic_fetch_and_64(int64_t volatile *ptr, int64_t val);
|
||||
void* atomic_fetch_and_ptr(void *ptr, void *val);
|
||||
int8_t atomic_or_fetch_8(int8_t volatile *ptr, int8_t val);
|
||||
void *atomic_fetch_and_ptr(void *ptr, void *val);
|
||||
int8_t atomic_or_fetch_8(int8_t volatile *ptr, int8_t val);
|
||||
int16_t atomic_or_fetch_16(int16_t volatile *ptr, int16_t val);
|
||||
int32_t atomic_or_fetch_32(int32_t volatile *ptr, int32_t val);
|
||||
int64_t atomic_or_fetch_64(int64_t volatile *ptr, int64_t val);
|
||||
void* atomic_or_fetch_ptr(void *ptr, void *val);
|
||||
int8_t atomic_fetch_or_8(int8_t volatile *ptr, int8_t val);
|
||||
void *atomic_or_fetch_ptr(void *ptr, void *val);
|
||||
int8_t atomic_fetch_or_8(int8_t volatile *ptr, int8_t val);
|
||||
int16_t atomic_fetch_or_16(int16_t volatile *ptr, int16_t val);
|
||||
int32_t atomic_fetch_or_32(int32_t volatile *ptr, int32_t val);
|
||||
int64_t atomic_fetch_or_64(int64_t volatile *ptr, int64_t val);
|
||||
void* atomic_fetch_or_ptr(void *ptr, void *val);
|
||||
int8_t atomic_xor_fetch_8(int8_t volatile *ptr, int8_t val);
|
||||
void *atomic_fetch_or_ptr(void *ptr, void *val);
|
||||
int8_t atomic_xor_fetch_8(int8_t volatile *ptr, int8_t val);
|
||||
int16_t atomic_xor_fetch_16(int16_t volatile *ptr, int16_t val);
|
||||
int32_t atomic_xor_fetch_32(int32_t volatile *ptr, int32_t val);
|
||||
int64_t atomic_xor_fetch_64(int64_t volatile *ptr, int64_t val);
|
||||
void* atomic_xor_fetch_ptr(void *ptr, void *val);
|
||||
int8_t atomic_fetch_xor_8(int8_t volatile *ptr, int8_t val);
|
||||
void *atomic_xor_fetch_ptr(void *ptr, void *val);
|
||||
int8_t atomic_fetch_xor_8(int8_t volatile *ptr, int8_t val);
|
||||
int16_t atomic_fetch_xor_16(int16_t volatile *ptr, int16_t val);
|
||||
int32_t atomic_fetch_xor_32(int32_t volatile *ptr, int32_t val);
|
||||
int64_t atomic_fetch_xor_64(int64_t volatile *ptr, int64_t val);
|
||||
void* atomic_fetch_xor_ptr(void *ptr, void *val);
|
||||
void *atomic_fetch_xor_ptr(void *ptr, void *val);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -17,6 +17,147 @@
|
|||
#include "mndDef.h"
|
||||
#include "mndConsumer.h"
|
||||
|
||||
int32_t tEncodeSStreamObj(SEncoder *pEncoder, const SStreamObj *pObj) {
|
||||
int32_t sz = 0;
|
||||
/*int32_t outputNameSz = 0;*/
|
||||
if (tEncodeCStr(pEncoder, pObj->name) < 0) return -1;
|
||||
if (tEncodeCStr(pEncoder, pObj->sourceDb) < 0) return -1;
|
||||
if (tEncodeCStr(pEncoder, pObj->targetDb) < 0) return -1;
|
||||
if (tEncodeCStr(pEncoder, pObj->targetSTbName) < 0) return -1;
|
||||
if (tEncodeI64(pEncoder, pObj->targetStbUid) < 0) return -1;
|
||||
if (tEncodeI64(pEncoder, pObj->createTime) < 0) return -1;
|
||||
if (tEncodeI64(pEncoder, pObj->updateTime) < 0) return -1;
|
||||
if (tEncodeI64(pEncoder, pObj->uid) < 0) return -1;
|
||||
if (tEncodeI64(pEncoder, pObj->dbUid) < 0) return -1;
|
||||
if (tEncodeI32(pEncoder, pObj->version) < 0) return -1;
|
||||
if (tEncodeI8(pEncoder, pObj->status) < 0) return -1;
|
||||
if (tEncodeI8(pEncoder, pObj->createdBy) < 0) return -1;
|
||||
if (tEncodeI8(pEncoder, pObj->trigger) < 0) return -1;
|
||||
if (tEncodeI32(pEncoder, pObj->triggerParam) < 0) return -1;
|
||||
if (tEncodeI64(pEncoder, pObj->waterMark) < 0) return -1;
|
||||
if (tEncodeI32(pEncoder, pObj->fixedSinkVgId) < 0) return -1;
|
||||
if (tEncodeI64(pEncoder, pObj->smaId) < 0) return -1;
|
||||
if (tEncodeCStr(pEncoder, pObj->sql) < 0) return -1;
|
||||
/*if (tEncodeCStr(pEncoder, pObj->logicalPlan) < 0) return -1;*/
|
||||
if (tEncodeCStr(pEncoder, pObj->physicalPlan) < 0) return -1;
|
||||
// TODO encode tasks
|
||||
if (pObj->tasks) {
|
||||
sz = taosArrayGetSize(pObj->tasks);
|
||||
}
|
||||
if (tEncodeI32(pEncoder, sz) < 0) return -1;
|
||||
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
SArray *pArray = taosArrayGetP(pObj->tasks, i);
|
||||
int32_t innerSz = taosArrayGetSize(pArray);
|
||||
if (tEncodeI32(pEncoder, innerSz) < 0) return -1;
|
||||
for (int32_t j = 0; j < innerSz; j++) {
|
||||
SStreamTask *pTask = taosArrayGetP(pArray, j);
|
||||
if (tEncodeSStreamTask(pEncoder, pTask) < 0) return -1;
|
||||
}
|
||||
}
|
||||
|
||||
if (tEncodeSSchemaWrapper(pEncoder, &pObj->outputSchema) < 0) return -1;
|
||||
|
||||
#if 0
|
||||
if (pObj->ColAlias != NULL) {
|
||||
outputNameSz = taosArrayGetSize(pObj->ColAlias);
|
||||
}
|
||||
if (tEncodeI32(pEncoder, outputNameSz) < 0) return -1;
|
||||
for (int32_t i = 0; i < outputNameSz; i++) {
|
||||
char *name = taosArrayGetP(pObj->ColAlias, i);
|
||||
if (tEncodeCStr(pEncoder, name) < 0) return -1;
|
||||
}
|
||||
#endif
|
||||
return pEncoder->pos;
|
||||
}
|
||||
|
||||
int32_t tDecodeSStreamObj(SDecoder *pDecoder, SStreamObj *pObj) {
|
||||
if (tDecodeCStrTo(pDecoder, pObj->name) < 0) return -1;
|
||||
if (tDecodeCStrTo(pDecoder, pObj->sourceDb) < 0) return -1;
|
||||
if (tDecodeCStrTo(pDecoder, pObj->targetDb) < 0) return -1;
|
||||
if (tDecodeCStrTo(pDecoder, pObj->targetSTbName) < 0) return -1;
|
||||
if (tDecodeI64(pDecoder, &pObj->targetStbUid) < 0) return -1;
|
||||
if (tDecodeI64(pDecoder, &pObj->createTime) < 0) return -1;
|
||||
if (tDecodeI64(pDecoder, &pObj->updateTime) < 0) return -1;
|
||||
if (tDecodeI64(pDecoder, &pObj->uid) < 0) return -1;
|
||||
if (tDecodeI64(pDecoder, &pObj->dbUid) < 0) return -1;
|
||||
if (tDecodeI32(pDecoder, &pObj->version) < 0) return -1;
|
||||
if (tDecodeI8(pDecoder, &pObj->status) < 0) return -1;
|
||||
if (tDecodeI8(pDecoder, &pObj->createdBy) < 0) return -1;
|
||||
if (tDecodeI8(pDecoder, &pObj->trigger) < 0) return -1;
|
||||
if (tDecodeI32(pDecoder, &pObj->triggerParam) < 0) return -1;
|
||||
if (tDecodeI64(pDecoder, &pObj->waterMark) < 0) return -1;
|
||||
if (tDecodeI32(pDecoder, &pObj->fixedSinkVgId) < 0) return -1;
|
||||
if (tDecodeI64(pDecoder, &pObj->smaId) < 0) return -1;
|
||||
if (tDecodeCStrAlloc(pDecoder, &pObj->sql) < 0) return -1;
|
||||
/*if (tDecodeCStrAlloc(pDecoder, &pObj->logicalPlan) < 0) return -1;*/
|
||||
if (tDecodeCStrAlloc(pDecoder, &pObj->physicalPlan) < 0) return -1;
|
||||
pObj->tasks = NULL;
|
||||
int32_t sz;
|
||||
if (tDecodeI32(pDecoder, &sz) < 0) return -1;
|
||||
if (sz != 0) {
|
||||
pObj->tasks = taosArrayInit(sz, sizeof(void *));
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
int32_t innerSz;
|
||||
if (tDecodeI32(pDecoder, &innerSz) < 0) return -1;
|
||||
SArray *pArray = taosArrayInit(innerSz, sizeof(void *));
|
||||
for (int32_t j = 0; j < innerSz; j++) {
|
||||
SStreamTask *pTask = taosMemoryCalloc(1, sizeof(SStreamTask));
|
||||
if (pTask == NULL) return -1;
|
||||
if (tDecodeSStreamTask(pDecoder, pTask) < 0) return -1;
|
||||
taosArrayPush(pArray, &pTask);
|
||||
}
|
||||
taosArrayPush(pObj->tasks, &pArray);
|
||||
}
|
||||
}
|
||||
|
||||
if (tDecodeSSchemaWrapper(pDecoder, &pObj->outputSchema) < 0) return -1;
|
||||
#if 0
|
||||
int32_t outputNameSz;
|
||||
if (tDecodeI32(pDecoder, &outputNameSz) < 0) return -1;
|
||||
if (outputNameSz != 0) {
|
||||
pObj->ColAlias = taosArrayInit(outputNameSz, sizeof(void *));
|
||||
if (pObj->ColAlias == NULL) {
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
for (int32_t i = 0; i < outputNameSz; i++) {
|
||||
char *name;
|
||||
if (tDecodeCStrAlloc(pDecoder, &name) < 0) return -1;
|
||||
taosArrayPush(pObj->ColAlias, &name);
|
||||
}
|
||||
#endif
|
||||
return 0;
|
||||
}
|
||||
|
||||
SMqVgEp *tCloneSMqVgEp(const SMqVgEp *pVgEp) {
|
||||
SMqVgEp *pVgEpNew = taosMemoryMalloc(sizeof(SMqVgEp));
|
||||
if (pVgEpNew == NULL) return NULL;
|
||||
pVgEpNew->vgId = pVgEp->vgId;
|
||||
pVgEpNew->qmsg = strdup(pVgEp->qmsg);
|
||||
pVgEpNew->epSet = pVgEp->epSet;
|
||||
return pVgEpNew;
|
||||
}
|
||||
|
||||
void tDeleteSMqVgEp(SMqVgEp *pVgEp) {
|
||||
if (pVgEp->qmsg) taosMemoryFree(pVgEp->qmsg);
|
||||
}
|
||||
|
||||
int32_t tEncodeSMqVgEp(void **buf, const SMqVgEp *pVgEp) {
|
||||
int32_t tlen = 0;
|
||||
tlen += taosEncodeFixedI32(buf, pVgEp->vgId);
|
||||
tlen += taosEncodeString(buf, pVgEp->qmsg);
|
||||
tlen += taosEncodeSEpSet(buf, &pVgEp->epSet);
|
||||
return tlen;
|
||||
}
|
||||
|
||||
void *tDecodeSMqVgEp(const void *buf, SMqVgEp *pVgEp) {
|
||||
buf = taosDecodeFixedI32(buf, &pVgEp->vgId);
|
||||
buf = taosDecodeString(buf, &pVgEp->qmsg);
|
||||
buf = taosDecodeSEpSet(buf, &pVgEp->epSet);
|
||||
return (void *)buf;
|
||||
}
|
||||
|
||||
SMqConsumerObj *tNewSMqConsumerObj(int64_t consumerId, char cgroup[TSDB_CGROUP_LEN]) {
|
||||
SMqConsumerObj *pConsumer = taosMemoryCalloc(1, sizeof(SMqConsumerObj));
|
||||
if (pConsumer == NULL) {
|
||||
|
@ -187,34 +328,6 @@ void *tDecodeSMqConsumerObj(const void *buf, SMqConsumerObj *pConsumer) {
|
|||
return (void *)buf;
|
||||
}
|
||||
|
||||
SMqVgEp *tCloneSMqVgEp(const SMqVgEp *pVgEp) {
|
||||
SMqVgEp *pVgEpNew = taosMemoryMalloc(sizeof(SMqVgEp));
|
||||
if (pVgEpNew == NULL) return NULL;
|
||||
pVgEpNew->vgId = pVgEp->vgId;
|
||||
pVgEpNew->qmsg = strdup(pVgEp->qmsg);
|
||||
pVgEpNew->epSet = pVgEp->epSet;
|
||||
return pVgEpNew;
|
||||
}
|
||||
|
||||
void tDeleteSMqVgEp(SMqVgEp *pVgEp) {
|
||||
if (pVgEp->qmsg) taosMemoryFree(pVgEp->qmsg);
|
||||
}
|
||||
|
||||
int32_t tEncodeSMqVgEp(void **buf, const SMqVgEp *pVgEp) {
|
||||
int32_t tlen = 0;
|
||||
tlen += taosEncodeFixedI32(buf, pVgEp->vgId);
|
||||
tlen += taosEncodeString(buf, pVgEp->qmsg);
|
||||
tlen += taosEncodeSEpSet(buf, &pVgEp->epSet);
|
||||
return tlen;
|
||||
}
|
||||
|
||||
void *tDecodeSMqVgEp(const void *buf, SMqVgEp *pVgEp) {
|
||||
buf = taosDecodeFixedI32(buf, &pVgEp->vgId);
|
||||
buf = taosDecodeString(buf, &pVgEp->qmsg);
|
||||
buf = taosDecodeSEpSet(buf, &pVgEp->epSet);
|
||||
return (void *)buf;
|
||||
}
|
||||
|
||||
SMqConsumerEp *tCloneSMqConsumerEp(const SMqConsumerEp *pConsumerEpOld) {
|
||||
SMqConsumerEp *pConsumerEpNew = taosMemoryMalloc(sizeof(SMqConsumerEp));
|
||||
if (pConsumerEpNew == NULL) return NULL;
|
||||
|
@ -413,119 +526,6 @@ void *tDecodeSMqSubActionLogObj(const void *buf, SMqSubActionLogObj *pLog) {
|
|||
return (void *)buf;
|
||||
}
|
||||
|
||||
int32_t tEncodeSStreamObj(SEncoder *pEncoder, const SStreamObj *pObj) {
|
||||
int32_t sz = 0;
|
||||
/*int32_t outputNameSz = 0;*/
|
||||
if (tEncodeCStr(pEncoder, pObj->name) < 0) return -1;
|
||||
if (tEncodeCStr(pEncoder, pObj->sourceDb) < 0) return -1;
|
||||
if (tEncodeCStr(pEncoder, pObj->targetDb) < 0) return -1;
|
||||
if (tEncodeCStr(pEncoder, pObj->targetSTbName) < 0) return -1;
|
||||
if (tEncodeI64(pEncoder, pObj->targetStbUid) < 0) return -1;
|
||||
if (tEncodeI64(pEncoder, pObj->createTime) < 0) return -1;
|
||||
if (tEncodeI64(pEncoder, pObj->updateTime) < 0) return -1;
|
||||
if (tEncodeI64(pEncoder, pObj->uid) < 0) return -1;
|
||||
if (tEncodeI64(pEncoder, pObj->dbUid) < 0) return -1;
|
||||
if (tEncodeI32(pEncoder, pObj->version) < 0) return -1;
|
||||
if (tEncodeI8(pEncoder, pObj->status) < 0) return -1;
|
||||
if (tEncodeI8(pEncoder, pObj->createdBy) < 0) return -1;
|
||||
if (tEncodeI8(pEncoder, pObj->trigger) < 0) return -1;
|
||||
if (tEncodeI32(pEncoder, pObj->triggerParam) < 0) return -1;
|
||||
if (tEncodeI64(pEncoder, pObj->waterMark) < 0) return -1;
|
||||
if (tEncodeI32(pEncoder, pObj->fixedSinkVgId) < 0) return -1;
|
||||
if (tEncodeI64(pEncoder, pObj->smaId) < 0) return -1;
|
||||
if (tEncodeCStr(pEncoder, pObj->sql) < 0) return -1;
|
||||
/*if (tEncodeCStr(pEncoder, pObj->logicalPlan) < 0) return -1;*/
|
||||
if (tEncodeCStr(pEncoder, pObj->physicalPlan) < 0) return -1;
|
||||
// TODO encode tasks
|
||||
if (pObj->tasks) {
|
||||
sz = taosArrayGetSize(pObj->tasks);
|
||||
}
|
||||
if (tEncodeI32(pEncoder, sz) < 0) return -1;
|
||||
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
SArray *pArray = taosArrayGetP(pObj->tasks, i);
|
||||
int32_t innerSz = taosArrayGetSize(pArray);
|
||||
if (tEncodeI32(pEncoder, innerSz) < 0) return -1;
|
||||
for (int32_t j = 0; j < innerSz; j++) {
|
||||
SStreamTask *pTask = taosArrayGetP(pArray, j);
|
||||
if (tEncodeSStreamTask(pEncoder, pTask) < 0) return -1;
|
||||
}
|
||||
}
|
||||
|
||||
if (tEncodeSSchemaWrapper(pEncoder, &pObj->outputSchema) < 0) return -1;
|
||||
|
||||
#if 0
|
||||
if (pObj->ColAlias != NULL) {
|
||||
outputNameSz = taosArrayGetSize(pObj->ColAlias);
|
||||
}
|
||||
if (tEncodeI32(pEncoder, outputNameSz) < 0) return -1;
|
||||
for (int32_t i = 0; i < outputNameSz; i++) {
|
||||
char *name = taosArrayGetP(pObj->ColAlias, i);
|
||||
if (tEncodeCStr(pEncoder, name) < 0) return -1;
|
||||
}
|
||||
#endif
|
||||
return pEncoder->pos;
|
||||
}
|
||||
|
||||
int32_t tDecodeSStreamObj(SDecoder *pDecoder, SStreamObj *pObj) {
|
||||
if (tDecodeCStrTo(pDecoder, pObj->name) < 0) return -1;
|
||||
if (tDecodeCStrTo(pDecoder, pObj->sourceDb) < 0) return -1;
|
||||
if (tDecodeCStrTo(pDecoder, pObj->targetDb) < 0) return -1;
|
||||
if (tDecodeCStrTo(pDecoder, pObj->targetSTbName) < 0) return -1;
|
||||
if (tDecodeI64(pDecoder, &pObj->targetStbUid) < 0) return -1;
|
||||
if (tDecodeI64(pDecoder, &pObj->createTime) < 0) return -1;
|
||||
if (tDecodeI64(pDecoder, &pObj->updateTime) < 0) return -1;
|
||||
if (tDecodeI64(pDecoder, &pObj->uid) < 0) return -1;
|
||||
if (tDecodeI64(pDecoder, &pObj->dbUid) < 0) return -1;
|
||||
if (tDecodeI32(pDecoder, &pObj->version) < 0) return -1;
|
||||
if (tDecodeI8(pDecoder, &pObj->status) < 0) return -1;
|
||||
if (tDecodeI8(pDecoder, &pObj->createdBy) < 0) return -1;
|
||||
if (tDecodeI8(pDecoder, &pObj->trigger) < 0) return -1;
|
||||
if (tDecodeI32(pDecoder, &pObj->triggerParam) < 0) return -1;
|
||||
if (tDecodeI64(pDecoder, &pObj->waterMark) < 0) return -1;
|
||||
if (tDecodeI32(pDecoder, &pObj->fixedSinkVgId) < 0) return -1;
|
||||
if (tDecodeI64(pDecoder, &pObj->smaId) < 0) return -1;
|
||||
if (tDecodeCStrAlloc(pDecoder, &pObj->sql) < 0) return -1;
|
||||
/*if (tDecodeCStrAlloc(pDecoder, &pObj->logicalPlan) < 0) return -1;*/
|
||||
if (tDecodeCStrAlloc(pDecoder, &pObj->physicalPlan) < 0) return -1;
|
||||
pObj->tasks = NULL;
|
||||
int32_t sz;
|
||||
if (tDecodeI32(pDecoder, &sz) < 0) return -1;
|
||||
if (sz != 0) {
|
||||
pObj->tasks = taosArrayInit(sz, sizeof(void *));
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
int32_t innerSz;
|
||||
if (tDecodeI32(pDecoder, &innerSz) < 0) return -1;
|
||||
SArray *pArray = taosArrayInit(innerSz, sizeof(void *));
|
||||
for (int32_t j = 0; j < innerSz; j++) {
|
||||
SStreamTask *pTask = taosMemoryCalloc(1, sizeof(SStreamTask));
|
||||
if (pTask == NULL) return -1;
|
||||
if (tDecodeSStreamTask(pDecoder, pTask) < 0) return -1;
|
||||
taosArrayPush(pArray, &pTask);
|
||||
}
|
||||
taosArrayPush(pObj->tasks, &pArray);
|
||||
}
|
||||
}
|
||||
|
||||
if (tDecodeSSchemaWrapper(pDecoder, &pObj->outputSchema) < 0) return -1;
|
||||
#if 0
|
||||
int32_t outputNameSz;
|
||||
if (tDecodeI32(pDecoder, &outputNameSz) < 0) return -1;
|
||||
if (outputNameSz != 0) {
|
||||
pObj->ColAlias = taosArrayInit(outputNameSz, sizeof(void *));
|
||||
if (pObj->ColAlias == NULL) {
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
for (int32_t i = 0; i < outputNameSz; i++) {
|
||||
char *name;
|
||||
if (tDecodeCStrAlloc(pDecoder, &name) < 0) return -1;
|
||||
taosArrayPush(pObj->ColAlias, &name);
|
||||
}
|
||||
#endif
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t tEncodeSMqOffsetObj(void **buf, const SMqOffsetObj *pOffset) {
|
||||
int32_t tlen = 0;
|
||||
tlen += taosEncodeString(buf, pOffset->key);
|
||||
|
|
|
@ -510,4 +510,4 @@ TEST_F(MndTestTrans2, 04_Conflict) {
|
|||
ASSERT_EQ(pUser, nullptr);
|
||||
mndReleaseUser(pMnode, pUser);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1031,3 +1031,37 @@ int32_t tqProcessTaskExec(STQ* pTq, char* msg, int32_t msgLen, int32_t workerId)
|
|||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t tqProcessTaskExec2(STQ* pTq, char* msg, int32_t msgLen) {
|
||||
SStreamTaskExecReq req = {0};
|
||||
tDecodeSStreamTaskExecReq(msg, &req);
|
||||
int32_t taskId = req.taskId;
|
||||
|
||||
SStreamTask* pTask = taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
|
||||
ASSERT(pTask);
|
||||
ASSERT(pTask->inputType == TASK_INPUT_TYPE__DATA_BLOCK);
|
||||
|
||||
// enqueue
|
||||
int32_t inputStatus = streamEnqueueDataBlk(pTask, (SStreamDataBlock*)req.data);
|
||||
if (inputStatus == TASK_INPUT_STATUS__BLOCKED) {
|
||||
// TODO rsp blocked
|
||||
return 0;
|
||||
}
|
||||
|
||||
// try exec
|
||||
int8_t execStatus = atomic_val_compare_exchange_8(&pTask->status, TASK_STATUS__IDLE, TASK_STATUS__EXECUTING);
|
||||
if (execStatus == TASK_STATUS__IDLE) {
|
||||
if (streamTaskExecNew(pTask) < 0) {
|
||||
atomic_store_8(&pTask->status, TASK_STATUS__CLOSING);
|
||||
|
||||
goto FAIL;
|
||||
}
|
||||
} else if (execStatus == TASK_STATUS__EXECUTING) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
// TODO rsp success
|
||||
return 0;
|
||||
FAIL:
|
||||
return -1;
|
||||
}
|
||||
|
|
|
@ -16,6 +16,25 @@
|
|||
#include "tstream.h"
|
||||
#include "executor.h"
|
||||
|
||||
int32_t streamDataBlockEncode(void** buf, const SStreamDataBlock* pOutput) {
|
||||
int32_t tlen = 0;
|
||||
tlen += taosEncodeFixedI8(buf, pOutput->type);
|
||||
tlen += taosEncodeFixedI32(buf, pOutput->sourceVg);
|
||||
tlen += taosEncodeFixedI64(buf, pOutput->sourceVer);
|
||||
ASSERT(pOutput->type == STREAM_INPUT__DATA_BLOCK);
|
||||
tlen += tEncodeDataBlocks(buf, pOutput->blocks);
|
||||
return tlen;
|
||||
}
|
||||
|
||||
void* streamDataBlockDecode(const void* buf, SStreamDataBlock* pInput) {
|
||||
buf = taosDecodeFixedI8(buf, &pInput->type);
|
||||
buf = taosDecodeFixedI32(buf, &pInput->sourceVg);
|
||||
buf = taosDecodeFixedI64(buf, &pInput->sourceVer);
|
||||
ASSERT(pInput->type == STREAM_INPUT__DATA_BLOCK);
|
||||
buf = tDecodeDataBlocks(buf, &pInput->blocks);
|
||||
return (void*)buf;
|
||||
}
|
||||
|
||||
static int32_t streamBuildDispatchMsg(SStreamTask* pTask, SArray* data, SRpcMsg* pMsg, SEpSet** ppEpSet) {
|
||||
SStreamTaskExecReq req = {
|
||||
.streamId = pTask->streamId,
|
||||
|
@ -97,6 +116,226 @@ static int32_t streamShuffleDispatch(SStreamTask* pTask, SMsgCb* pMsgCb, SHashOb
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t streamEnqueueDataSubmit(SStreamTask* pTask, SStreamDataSubmit* input) {
|
||||
ASSERT(pTask->inputType == TASK_INPUT_TYPE__SUMBIT_BLOCK);
|
||||
int8_t inputStatus = atomic_load_8(&pTask->inputStatus);
|
||||
if (inputStatus == TASK_INPUT_STATUS__NORMAL) {
|
||||
streamDataSubmitRefInc(input);
|
||||
taosWriteQitem(pTask->inputQ, input);
|
||||
}
|
||||
return inputStatus;
|
||||
}
|
||||
|
||||
int32_t streamEnqueueDataBlk(SStreamTask* pTask, SStreamDataBlock* input) {
|
||||
ASSERT(pTask->inputType == TASK_INPUT_TYPE__DATA_BLOCK);
|
||||
taosWriteQitem(pTask->inputQ, input);
|
||||
int8_t inputStatus = atomic_load_8(&pTask->inputStatus);
|
||||
return inputStatus;
|
||||
}
|
||||
|
||||
int32_t streamTaskProcessTriggerReq(SStreamTask* pTask, SMsgCb* pMsgCb, char* msg, int32_t msgLen) {
|
||||
//
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t streamTaskProcessInputReq(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDataBlock* pBlock, SRpcMsg* pRsp) {
|
||||
// 1. handle input
|
||||
// 1.1 enqueue
|
||||
taosWriteQitem(pTask->inputQ, pBlock);
|
||||
// 1.2 calc back pressure
|
||||
// 1.3 rsp by input status
|
||||
int8_t inputStatus = atomic_load_8(&pTask->inputStatus);
|
||||
SStreamDispatchRsp* pCont = rpcMallocCont(sizeof(SStreamDispatchRsp));
|
||||
pCont->status = inputStatus;
|
||||
pRsp->pCont = pCont;
|
||||
pRsp->contLen = sizeof(SStreamDispatchRsp);
|
||||
tmsgSendRsp(pRsp);
|
||||
// 2. try exec
|
||||
// 2.1. idle: exec
|
||||
// 2.2. executing: return
|
||||
// 2.3. closing: keep trying
|
||||
while (1) {
|
||||
int8_t execStatus = atomic_val_compare_exchange_8(&pTask->status, TASK_STATUS__IDLE, TASK_STATUS__EXECUTING);
|
||||
void* exec = pTask->exec.runners[0].executor;
|
||||
if (execStatus == TASK_STATUS__IDLE) {
|
||||
SArray* pRes = taosArrayInit(0, sizeof(void*));
|
||||
const SArray* blocks = pBlock->blocks;
|
||||
qSetMultiStreamInput(exec, blocks->pData, blocks->size, STREAM_DATA_TYPE_SSDATA_BLOCK);
|
||||
while (1) {
|
||||
SSDataBlock* output;
|
||||
uint64_t ts = 0;
|
||||
if (qExecTask(exec, &output, &ts) < 0) {
|
||||
ASSERT(false);
|
||||
}
|
||||
if (output == NULL) break;
|
||||
taosArrayPush(pRes, &output);
|
||||
}
|
||||
// TODO: wrap destroy block
|
||||
taosArrayDestroyP(pBlock->blocks, (FDelete)blockDataDestroy);
|
||||
|
||||
if (taosArrayGetSize(pRes) != 0) {
|
||||
SArray** resQ = taosAllocateQitem(sizeof(void**), DEF_QITEM);
|
||||
*resQ = pRes;
|
||||
taosWriteQitem(pTask->outputQ, resQ);
|
||||
}
|
||||
|
||||
} else if (execStatus == TASK_STATUS__CLOSING) {
|
||||
continue;
|
||||
} else if (execStatus == TASK_STATUS__EXECUTING)
|
||||
break;
|
||||
else {
|
||||
ASSERT(0);
|
||||
}
|
||||
}
|
||||
// 3. handle output
|
||||
// 3.1 check and set status
|
||||
// 3.2 dispatch / sink
|
||||
STaosQall* qall = taosAllocateQall();
|
||||
taosReadAllQitems(pTask->outputQ, qall);
|
||||
SArray** ppRes = NULL;
|
||||
while (1) {
|
||||
taosGetQitem(qall, (void**)&ppRes);
|
||||
if (ppRes == NULL) break;
|
||||
|
||||
SArray* pRes = *ppRes;
|
||||
if (pTask->sinkType == TASK_SINK__TABLE) {
|
||||
pTask->tbSink.tbSinkFunc(pTask, pTask->tbSink.vnode, pBlock->sourceVer, pRes);
|
||||
} else if (pTask->sinkType == TASK_SINK__SMA) {
|
||||
pTask->smaSink.smaSink(pTask->ahandle, pTask->smaSink.smaId, pRes);
|
||||
} else {
|
||||
}
|
||||
|
||||
// dispatch
|
||||
if (pTask->dispatchType == TASK_DISPATCH__INPLACE) {
|
||||
SRpcMsg dispatchMsg = {0};
|
||||
if (streamBuildDispatchMsg(pTask, pRes, &dispatchMsg, NULL) < 0) {
|
||||
ASSERT(0);
|
||||
return -1;
|
||||
}
|
||||
|
||||
int32_t qType;
|
||||
if (pTask->dispatchMsgType == TDMT_VND_TASK_PIPE_EXEC || pTask->dispatchMsgType == TDMT_SND_TASK_PIPE_EXEC) {
|
||||
qType = FETCH_QUEUE;
|
||||
} else if (pTask->dispatchMsgType == TDMT_VND_TASK_MERGE_EXEC ||
|
||||
pTask->dispatchMsgType == TDMT_SND_TASK_MERGE_EXEC) {
|
||||
qType = MERGE_QUEUE;
|
||||
} else if (pTask->dispatchMsgType == TDMT_VND_TASK_WRITE_EXEC) {
|
||||
qType = WRITE_QUEUE;
|
||||
} else {
|
||||
ASSERT(0);
|
||||
}
|
||||
tmsgPutToQueue(pMsgCb, qType, &dispatchMsg);
|
||||
|
||||
} else if (pTask->dispatchType == TASK_DISPATCH__FIXED) {
|
||||
SRpcMsg dispatchMsg = {0};
|
||||
SEpSet* pEpSet = NULL;
|
||||
if (streamBuildDispatchMsg(pTask, pRes, &dispatchMsg, &pEpSet) < 0) {
|
||||
ASSERT(0);
|
||||
return -1;
|
||||
}
|
||||
|
||||
tmsgSendReq(pMsgCb, pEpSet, &dispatchMsg);
|
||||
|
||||
} else if (pTask->dispatchType == TASK_DISPATCH__SHUFFLE) {
|
||||
SHashObj* pShuffleRes = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
|
||||
if (pShuffleRes == NULL) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
int32_t sz = taosArrayGetSize(pRes);
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
SSDataBlock* pDataBlock = taosArrayGet(pRes, i);
|
||||
SArray* pArray = taosHashGet(pShuffleRes, &pDataBlock->info.groupId, sizeof(int64_t));
|
||||
if (pArray == NULL) {
|
||||
pArray = taosArrayInit(0, sizeof(SSDataBlock));
|
||||
if (pArray == NULL) {
|
||||
return -1;
|
||||
}
|
||||
taosHashPut(pShuffleRes, &pDataBlock->info.groupId, sizeof(int64_t), &pArray, sizeof(void*));
|
||||
}
|
||||
taosArrayPush(pArray, pDataBlock);
|
||||
}
|
||||
|
||||
if (streamShuffleDispatch(pTask, pMsgCb, pShuffleRes) < 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
} else {
|
||||
ASSERT(pTask->dispatchType == TASK_DISPATCH__NONE);
|
||||
}
|
||||
}
|
||||
//
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t streamTaskProcessDispatchRsp(SStreamTask* pTask, char* msg, int32_t msgLen) {
|
||||
//
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t streamTaskProcessRecoverReq(SStreamTask* pTask, char* msg) {
|
||||
//
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t streamTaskExecNew(SStreamTask* pTask) {
|
||||
SArray* pRes = NULL;
|
||||
if (pTask->execType == TASK_EXEC__PIPE || pTask->execType == TASK_EXEC__MERGE) {
|
||||
// TODO remove multi runner
|
||||
void* exec = pTask->exec.runners[0].executor;
|
||||
|
||||
int8_t status = atomic_val_compare_exchange_8(&pTask->status, TASK_STATUS__IDLE, TASK_STATUS__EXECUTING);
|
||||
if (status == TASK_STATUS__IDLE) {
|
||||
pRes = taosArrayInit(0, sizeof(void*));
|
||||
if (pRes == NULL) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
void* input = NULL;
|
||||
taosWriteQitem(pTask->inputQ, &input);
|
||||
if (input == NULL) return 0;
|
||||
|
||||
// TODO: fix type
|
||||
if (pTask->sourceType == TASK_SOURCE__SCAN) {
|
||||
SStreamDataSubmit* pSubmit = (SStreamDataSubmit*)input;
|
||||
qSetStreamInput(exec, pSubmit->data, STREAM_DATA_TYPE_SUBMIT_BLOCK);
|
||||
while (1) {
|
||||
SSDataBlock* output;
|
||||
uint64_t ts = 0;
|
||||
if (qExecTask(exec, &output, &ts) < 0) {
|
||||
ASSERT(false);
|
||||
}
|
||||
if (output == NULL) break;
|
||||
taosArrayPush(pRes, &output);
|
||||
}
|
||||
streamDataSubmitRefDec(pSubmit);
|
||||
} else {
|
||||
SStreamDataBlock* pStreamBlock = (SStreamDataBlock*)input;
|
||||
const SArray* blocks = pStreamBlock->blocks;
|
||||
qSetMultiStreamInput(exec, blocks->pData, blocks->size, STREAM_DATA_TYPE_SSDATA_BLOCK);
|
||||
while (1) {
|
||||
SSDataBlock* output;
|
||||
uint64_t ts = 0;
|
||||
if (qExecTask(exec, &output, &ts) < 0) {
|
||||
ASSERT(false);
|
||||
}
|
||||
if (output == NULL) break;
|
||||
taosArrayPush(pRes, &output);
|
||||
}
|
||||
// TODO: wrap destroy block
|
||||
taosArrayDestroyP(pStreamBlock->blocks, (FDelete)blockDataDestroy);
|
||||
}
|
||||
|
||||
if (taosArrayGetSize(pRes) != 0) {
|
||||
SArray** resQ = taosAllocateQitem(sizeof(void**), DEF_QITEM);
|
||||
*resQ = pRes;
|
||||
taosWriteQitem(pTask->outputQ, resQ);
|
||||
}
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t streamExecTask(SStreamTask* pTask, SMsgCb* pMsgCb, const void* input, int32_t inputType, int32_t workId) {
|
||||
SArray* pRes = NULL;
|
||||
// source
|
||||
|
@ -251,9 +490,17 @@ SStreamTask* tNewSStreamTask(int64_t streamId) {
|
|||
}
|
||||
pTask->taskId = tGenIdPI32();
|
||||
pTask->streamId = streamId;
|
||||
pTask->status = STREAM_TASK_STATUS__RUNNING;
|
||||
/*pTask->qmsg = NULL;*/
|
||||
pTask->status = TASK_STATUS__IDLE;
|
||||
|
||||
pTask->inputQ = taosOpenQueue();
|
||||
pTask->outputQ = taosOpenQueue();
|
||||
if (pTask->inputQ == NULL || pTask->outputQ == NULL) goto FAIL;
|
||||
return pTask;
|
||||
FAIL:
|
||||
if (pTask->inputQ) taosCloseQueue(pTask->inputQ);
|
||||
if (pTask->outputQ) taosCloseQueue(pTask->outputQ);
|
||||
if (pTask) taosMemoryFree(pTask);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
int32_t tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
|
||||
|
@ -349,10 +596,16 @@ int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
|
|||
}
|
||||
|
||||
void tFreeSStreamTask(SStreamTask* pTask) {
|
||||
taosCloseQueue(pTask->inputQ);
|
||||
taosCloseQueue(pTask->outputQ);
|
||||
// TODO
|
||||
/*taosMemoryFree(pTask->qmsg);*/
|
||||
if (pTask->exec.qmsg) taosMemoryFree(pTask->exec.qmsg);
|
||||
for (int32_t i = 0; i < pTask->exec.numOfRunners; i++) {
|
||||
qDestroyTask(pTask->exec.runners[i].executor);
|
||||
}
|
||||
taosMemoryFree(pTask->exec.runners);
|
||||
/*taosMemoryFree(pTask->executor);*/
|
||||
/*taosMemoryFree(pTask);*/
|
||||
taosMemoryFree(pTask);
|
||||
}
|
||||
|
||||
#if 0
|
||||
|
|
|
@ -136,7 +136,7 @@ if $data35 != 3 then
|
|||
endi
|
||||
|
||||
sql insert into t1 values(1648791223001,12,14,13,11.1);
|
||||
sleep 100
|
||||
sleep 500
|
||||
sql select _wstartts, c1, c2 ,c3 ,c4, c5 from streamt;
|
||||
|
||||
if $rows != 4 then
|
||||
|
|
Loading…
Reference in New Issue