规则引擎

规则引擎旨在为物联网解决方案提供实时分析功能,实现每秒对数百万事件的流式分析,帮助用户灵活地转发和处理设备消息, 用户可通过类SQL语言设定规则,对消息进行筛选、变型和转发,实现过滤数据和处理数据,并根据不同场景将数据无缝转发至不同的数据目的地,例如消息队列、jdbc、用户api、时序数据库等。

创建规则

菜单路径

主界面→产品与服务→物联网→规则引擎

操作步骤

1.登录浪潮云服务控制台。

2.点击“产品与服务”>“物联网”>“规则引擎”,进入规则引擎操作界面。

3.点击"+ 创建规则",进入创建规则页面,填写基本信息。 如表1所示。

表1 属性说明

属性 属性说明
规则名称 系统自动生成,用户可根据需要自行更改
规格 指运行spark应用的容器个数和资源大小,目前仅支持选择标准规格
数据来源 根据用户需要设置主题类型、项目、产品并填写主题名称,主题名称支持通配符
规则描述 用于描述标记该规则的用途等信息

数据来源需要设备有发布权限(用户有订阅权限),如果没有上述权限,规则将无法创建成功。

4.点击“确定”按钮,规则创建完成,返回“规则列表”页面,查看已经创建的规则。

编辑调试规则

菜单路径

主界面→产品与服务→物联网→规则引擎→规则列表→“编辑调试”按钮

操作步骤

1.登录浪潮云服务控制台。

2.点击“产品与服务”>“物联网”>“规则引擎”,进入规则列表面。

3.点击"编辑调试"按钮,进入规则编辑调试页面,设置详细信息,控制台提供了SQL填写的方式来简化SQL语句的生成,具体可参考SQL表达式

  • 查询字段:从消息中筛选出数据用于后续操作。系统支持多个字段的处理,在数据转发至消费者之前需要对数据格式进行调整。注意:如果想筛选timestamp,value等flink sql中的关键字,需要加上反引号,比如timestamp,value,具体可参考Flink关键字.
  • 查询函数:topic()、topic(1)、topic(2)、deviceName()。从数据来源读取信息并转化为数据转存至输出数据中,系统支持多个函数的处理。举例说明,原始消息数据格式为:
{
  "timestamp": 1560757257790,
  "state": {
    "reported": {
      "GeoLocation": "Australia",
      "temperature": 25,
      "_send": 22713,
      "_receive": 0,
      "k1": "home",
      "k2": "yellow",
      "humi": 28,
      "_delayMs": 0
    }
  },
  "clientToken": "e5a2edc6-6fbe-43b9-bfa8-10ae68999c1f"
}

(1)流向MQTT时,要求字段中必须包含注解字段,根据注解生成流向MQTT的数据格式。如果不加暗示,将查询字段组成简单的json流向mqtt。查询字段实例如下:

 /*+ json('timestamp', 'clienttoken', 'data.temp:temp', 'data.humi:humi' ,'data.k1:k1','device','topic') */ state.reported.humi as humi,state.reported.k1 as k1, state.reported.temperature as temp, `timestamp` as `timestamp`, clientToken as clienttoken,topic() as topic,deviceName() as device

点击“测试”按钮,原始数据经过上述查询字段转换后输出数据为:

{
  "data": {
    "temp": 25,
    "k1": "home",
    "humi": 28
  },
  "clienttoken": "e5a2edc6-6fbe-43b9-bfa8-10ae68999c1f",
  "topic": "iot/iotol9592xu/21m0kg2y/device20/shadow/update",
  "device": "device20",
  "timestamp": 1560757257790
}

(2)流向jdbc时,字段必须与待流入数据表的字段相匹配,可根据数据流入填写的jdbc数据库配置信息进行修改。要求筛选出来的字段顺序与insert语句中的字段顺序必须相同。如下为流入jdbc的实例:

     state.reported.k1 as k1, state.reported.temperature as temp, deviceName() as device,`timestamp` as `timestamp`

原始数据经过上述查询字段转换后数据:

{
  "temp": 25,
  "k1": "home",
  "device": "device20",
  "timestamp": 1560757257790
}

(3)流向用户api时,可根据用户需求设置相应字段。如下为流入用户api的实例:

     `timestamp` as `timestamp`,'temperature' As metric, state.reported.temperature as _value,  clientToken as tag, topic() as topic,deviceName() as device

原始数据经过上述查询字段转换后数据:

{
  "metric": "temperature",
  "topic": "iot/iotol9592xu/21m0kg2y/device20/shadow/update",
  "_value": 25,
  "tag": "e5a2edc6-6fbe-43b9-bfa8-10ae68999c1f",
  "device": "device20",
  "timestamp": 1560757257790
}

(4)流向tsdb时,要求必须包含timestamp和tags字段,以及要筛选的字段和值,并以字段名_value的形式作为筛选值的字段,例如,要查询 温度(temperature),则查询字段中必须含有temperature和temperature_value两个字段。tags字段必须是Map形式。查询字段实例如下:

     'temperature' as temperature,`timestamp` as `timestamp`,state.reported.temperature as temperature_value, MAP['token',state.reported.k1] as tags

原始数据经过上述查询字段转换后数据:

{
  "temperature_value": 25,
  "temperature": "temperature",
  "timestamp": 1560757257790,
  "tags": {
    "token": "home"
  }
}
  • 约束条件:后续操作的触发条件。缺省情况下,转发每一条消息至目的地。例如state.reported.temperature > 50

  • 消息模板:Json格式消息示例,用于spark针对此类消息定义schema,在流处理过程中执行类SQL操作,目的是为了创建表结构,缓存设备上报的数据。可从订阅主题中随机获取某条消息。消息示例中消息schema需要对应查询字段。

  • 数据目的地:规则引擎目前支持将数据转发到消息队列、jdbc、用户api、时序数据库,点击“+添加数据目的地”按钮可进行添加,目前仅支持添加一个数据目的地,相关属性设置如下各表所示。

(1)MQTT

属性 属性说明
主题类型 设置主题类型,目前可选系统主题和自定义主题
项目 设置项目名称
产品 根据用户需要设置具体产品
Topic 即主题名称,支持通配符

数据目的地为mqtt时,要求设备有订阅该主题的权限,如果权限,添加数据目的地将不能成功。topic支持变量的形式,如${device}

(2)jdbc

属性 属性说明
数据库类型 设置数据库的类型,目前支持mysql
数据库地址 设置数据库地址
端口 设置数据库端口
数据库名称 设置数据库名称
字符集 设置数据库字符集
用户名 设置数据库用户名
密码 设置数据库用户密码
表名称 设置数据表名称
写入字段 填写需要写入的字段

要求:多个写入字段需要用英文的","隔开,写入字段的顺序与前面筛选出来字段的顺序必须要严格一致。

(3)用户api,输入可以获取并处理数据的api接口,该接口的请求方式需为POST,数据将在requestbody中以json字符串的形式逐条传送。

属性 属性说明
api地址 用户接受数据的API地址,支持http/https,可拼接参数。例:https://www.inspur.cloud.com/api/test?user=zhangsan&state=1

(4)tsdb

属性 属性说明
数据库名称 设置流向的时序数据库名称

示例页面如下图所示,

属性设置完成后,点击“确定”按钮,添加数据目的地成功,返回编辑调试页面,可查看数据目的地。

可对目的地进行编辑和删除,编辑的时候目的地类型不允许改变。

4.点击"提交"按钮,规则编辑调试完成,返回“规则列表”页面。

基本操作

列表中的基本操作如表2

表2 基本操作

操作名称 操作 详解
编辑调试 进入规则编辑调试页面,编辑规则。 规则名称、规则描述、数据来源、查询字段、约束条件、消息模板、数据目的地均可修改。
启用 提交流式处理任务。
停用 停止所执行的流式处理任务。
删除 删除规则。

SQL表达式

建立规则时,可以编写SQL来解析和处理数据。JSON数据可以映射为虚拟的表,其中Key对应表的列,Value对应列值,这样就可以使用SQL处理。

SELECT

SELECT语句中的字段,可以使用JSON中的键值,也可以使用SQL内置的函数,规则引擎支持的常用函数见表3
支持 * 和函数的组合。不支持SQL子查询。 上报的JSON数据格式,可以是数组或者嵌套的JSON,SQL语句支持使用JSONPath获取其中的属性值,如对于{a:{key1:v1, key2:v2}},可以通过a.key2 获取到值v2。使用变量时,需要注意单双引号区别:单引号表示常量,双引号或不加引号表示变量。如使用单引号'a.key2',值为a.key2。
支持函数查询。支持的函数有:topic()、topic(1)、topic(2)、deviceName()。如对于数据来源:iot/iotol9592xu/21m0kg2y/device20/shadow/update,可以通过topic()获取到值iot/iotol9592xu/21m0kg2y/device20/shadow/update,通过topic(1)获取到值iot,通过topic(2)获取到值iotol9592xu,通过deviceName()获取到值device20。
支持暗示的数据格式,比如/*+ json('timestamp', 'clienttoken', 'data.temp:temp', 'data.humi:humi' ,'data.k1:k1')*/,暗示用于数据目的地为物接入主题的情况,目的是将筛选出来的数据按照暗示给出的json格式进行转化,并存入目的主题。

表3 函数列表

函数名 说明 示例
CURRENT_TIMESTAMP 获取系统当前时间 CURRENT_TIMESTAMP
unix_timestamp 返回当前的 Unix 系统时间戳,秒为单位 unix_timestamp()
abs 求绝对值 abs(data)
floor 向下取整 floor(number)
ceil 向上取整 ceil(number)
length 求字符串长度 length(deviceid)
pmod 取模 pmod(m, n)
power 返回n的m次幂 power(2, 10)
upper 返回大写字符 upper(string)
topic 获取数据来源 topic() topic(1),topic(2)分别获取第一字段和第二字段。
deviceName 获取设备名称 deviceName()

FROM

FROM Topic 即该规则指定的数据来源。

WHERE

规则触发条件,条件表达式。不支持子SQL查询。WHERE中可以使用的字段和SELECT语句一致,当接收到对应Topic的消息时,WHERE语句的结果会作为是否触发规则的判断条件。具体条件表达式列表见表4

表4 条件表达式

操作符 描述 示例
= 相等 color='red'
<> 不等于 color<>'red'
AND 逻辑与 color='red' AND state='running'
OR 逻辑或 color='red' OR state='running'
() 括号代表一个整体 color='red' AND (state='running' OR state='ready')
+ 算数加 4+5
- 算数减 5-4
/ 20/5
* 4*5
% 取余数 11%2
< 小于 5<6
<= 小于或等于 5<=6
> 大于 5>4
>= 大于或等于 5>=4
CASE...WHEN...
THEN...ELSE...END
Case表达式 CASE col WHEN 1 THEN ‘Y’ WHEN 0 THEN ‘N’ ELSE ‘’ END as flag
IN 仅支持枚举,不支持子查询 比如 WHERE a IN(1,2,3); 不支持以下形式: WHERE a IN(select xxx)

Flink关键字

以下字符串组合已经被Flink保留为关键字。如果您想使用以下字符串作为字段名称,请在关键字两端添加`,例如`value`。

  A,ABS,ABSOLUTE,ACTION,ADA,ADD,ADMIN,AFTER,ALL,ALLOCATE,ALLOW,ALTER,ALWAYS,AND,ANY,ARE,ARRAY,AS,ASC,ASENSITIVE,ASSERTION,ASSIGNMENT,ASYMMETRIC,AT,ATOMIC,ATTRIBUTE,ATTRIBUTES,AUTHORIZATION,AVG,BEFORE,BEGIN,BERNOULLI,BETWEEN,BIGINT,BINARY,BIT,BLOB,BOOLEAN,BOTH,BREADTH,BY,C,CALL,CALLED,CARDINALITY,CASCADE,CASCADED,CASE,CAST,CATALOG,CATALOG_NAME,CEIL,CEILING,CENTURY,CHAIN,CHAR,CHARACTER,CHARACTERISTICTS,CHARACTERS,CHARACTER_LENGTH,CHARACTER_SET_CATALOG,CHARACTER_SET_NAME,CHARACTER_SET_SCHEMA,CHAR_LENGTH,CHECK,CLASS_ORIGIN,CLOB,CLOSE,COALESCE,COBOL,COLLATE,COLLATION,COLLATION_CATALOG,COLLATION_NAME,COLLATION_SCHEMA,COLLECT,COLUMN,COLUMN_NAME,COMMAND_FUNCTION,COMMAND_FUNCTION_CODE,COMMIT,COMMITTED,CONDITION,CONDITION_NUMBER,CONNECT,CONNECTION,CONNECTION_NAME,CONSTRAINT,CONSTRAINTS,CONSTRAINT_CATALOG,CONSTRAINT_NAME,CONSTRAINT_SCHEMA,CONSTRUCTOR,CONTAINS,CONTINUE,CONVERT,CORR,CORRESPONDING,COUNT,COVAR_POP,COVAR_SAMP,CREATE,CROSS,CUBE,CUME_DIST,CURRENT,CURRENT_CATALOG,CURRENT_DATE,CURRENT_DEFAULT_TRANSFORM_GROUP,CURRENT_PATH,CURRENT_ROLE,CURRENT_SCHEMA,CURRENT_TIME,CURRENT_TIMESTAMP,CURRENT_TRANSFORM_GROUP_FOR_TYPE,CURRENT_USER,CURSOR,CURSOR_NAME,CYCLE,DATA,DATABASE,DATE,DATETIME_INTERVAL_CODE,DATETIME_INTERVAL_PRECISION,DAY,DEALLOCATE,DEC,DECADE,DECIMAL,DECLARE,DEFAULT,DEFAULTS,DEFERRABLE,DEFERRED,DEFINED,DEFINER,DEGREE,DELETE,DENSE_RANK,DEPTH,DEREF,DERIVED,DESC,DESCRIBE,DESCRIPTION,DESCRIPTOR,DETERMINISTIC,DIAGNOSTICS,DISALLOW,DISCONNECT,DISPATCH,DISTINCT,DOMAIN,DOUBLE,DOW,DOY,DROP,DYNAMIC,DYNAMIC_FUNCTION,DYNAMIC_FUNCTION_CODE,EACH,ELEMENT,ELSE,END,END-EXEC,EPOCH,EQUALS,ESCAPE,EVERY,EXCEPT,EXCEPTION,EXCLUDE,EXCLUDING,EXEC,EXECUTE,EXISTS,EXP,EXPLAIN,EXTEND,EXTERNAL,EXTRACT,FALSE,FETCH,FILTER,FINAL,FIRST,FIRST_VALUE,FLOAT,FLOOR,FOLLOWING,FOR,FOREIGN,FORTRAN,FOUND,FRAC_SECOND,FREE,FROM,FULL,FUNCTION,FUSION,G,GENERAL,GENERATED,GET,GLOBAL,GO,GOTO,GRANT,GRANTED,GROUP,GROUPING,HAVING,HIERARCHY,HOLD,HOUR,IDENTITY,IMMEDIATE,IMPLEMENTATION,IMPORT,IN,INCLUDING,INCREMENT,INDICATOR,INITIALLY,INNER,INOUT,INPUT,INSENSITIVE,INSERT,INSTANCE,INSTANTIABLE,INT,INTEGER,INTERSECT,INTERSECTION,INTERVAL,INTO,INVOKER,IS,ISOLATION,JAVA,JOIN,K,KEY,KEY_MEMBER,KEY_TYPE,LABEL,LANGUAGE,LARGE,LAST,LAST_VALUE,LATERAL,LEADING,LEFT,LENGTH,LEVEL,LIBRARY,LIKE,LIMIT,LN,LOCAL,LOCALTIME,LOCALTIMESTAMP,LOCATOR,LOWER,M,MAP,MATCH,MATCHED,MAX,MAXVALUE,MEMBER,MERGE,MESSAGE_LENGTH,MESSAGE_OCTET_LENGTH,MESSAGE_TEXT,METHOD,MICROSECOND,MILLENNIUM,MIN,MINUTE,MINVALUE,MOD,MODIFIES,MODULE,MONTH,MORE,MULTISET,MUMPS,NAME,NAMES,NATIONAL,NATURAL,NCHAR,NCLOB,NESTING,NEW,NEXT,NO,NONE,NORMALIZE,NORMALIZED,NOT,NULL,NULLABLE,NULLIF,NULLS,NUMBER,NUMERIC,OBJECT,OCTETS,OCTET_LENGTH,OF,OFFSET,OLD,ON,ONLY,OPEN,OPTION,OPTIONS,OR,ORDER,ORDERING,ORDINALITY,OTHERS,OUT,OUTER,OUTPUT,OVER,OVERLAPS,OVERLAY,OVERRIDING,PAD,PARAMETER,PARAMETER_MODE,PARAMETER_NAME,PARAMETER_ORDINAL_POSITION,PARAMETER_SPECIFIC_CATALOG,PARAMETER_SPECIFIC_NAME,PARAMETER_SPECIFIC_SCHEMA,PARTIAL,PARTITION,PASCAL,PASSTHROUGH,PATH,PERCENTILE_CONT,PERCENTILE_DISC,PERCENT_RANK,PLACING,PLAN,PLI,POSITION,POWER,PRECEDING,PRECISION,PREPARE,PRESERVE,PRIMARY,PRIOR,PRIVILEGES,PROCEDURE,PUBLIC,QUARTER,RANGE,RANK,READ,READS,REAL,RECURSIVE,REF,REFERENCES,REFERENCING,REGR_AVGX,REGR_AVGY,REGR_COUNT,REGR_INTERCEPT,REGR_R2,REGR_SLOPE,REGR_SXX,REGR_SXY,REGR_SYY,RELATIVE,RELEASE,REPEATABLE,RESET,RESTART,RESTRICT,RESULT,RETURN,RETURNED_CARDINALITY,RETURNED_LENGTH,RETURNED_OCTET_LENGTH,RETURNED_SQLSTATE,RETURNS,REVOKE,RIGHT,ROLE,ROLLBACK,ROLLUP,ROUTINE,ROUTINE_CATALOG,ROUTINE_NAME,ROUTINE_SCHEMA,ROW,ROWS,ROW_COUNT,ROW_NUMBER,SAVEPOINT,SCALE,SCHEMA,SCHEMA_NAME,SCOPE,SCOPE_CATALOGS,SCOPE_NAME,SCOPE_SCHEMA,SCROLL,SEARCH,SECOND,SECTION,SECURITY,SELECT,SELF,SENSITIVE,SEQUENCE,SERIALIZABLE,SERVER,SERVER_NAME,SESSION,SESSION_USER,SET,SETS,SIMILAR,SIMPLE,SIZE,SMALLINT,SOME,SOURCE,SPACE,SPECIFIC,SPECIFICTYPE,SPECIFIC_NAME,SQL,SQLEXCEPTION,SQLSTATE,SQLWARNING,SQL_TSI_DAY,SQL_TSI_FRAC_SECOND,SQL_TSI_HOUR,SQL_TSI_MICROSECOND,SQL_TSI_MINUTE,SQL_TSI_MONTH,SQL_TSI_QUARTER,SQL_TSI_SECOND,SQL_TSI_WEEK,SQL_TSI_YEAR,SQRT,START,STATE,STATEMENT,STATIC,STDDEV_POP,STDDEV_SAMP,STREAM,STRUCTURE,STYLE,SUBCLASS_ORIGIN,SUBMULTISET,SUBSTITUTE,SUBSTRING,SUM,SYMMETRIC,SYSTEM,SYSTEM_USER,TABLE,TABLESAMPLE,TABLE_NAME,TEMPORARY,THEN,TIES,TIME,TIMESTAMP,TIMESTAMPADD,TIMESTAMPDIFF,TIMEZONE_HOUR,TIMEZONE_MINUTE,TINYINT,TO,TOP_LEVEL_COUNT,TRAILING,TRANSACTION,TRANSACTIONS_ACTIVE,TRANSACTIONS_COMMITTED,TRANSACTIONS_ROLLED_BACK,TRANSFORM,TRANSFORMS,TRANSLATE,TRANSLATION,TREAT,TRIGGER,TRIGGER_CATALOG,TRIGGER_NAME,TRIGGER_SCHEMA,TRIM,TRUE,TYPE,UESCAPE,UNBOUNDED,UNCOMMITTED,UNDER,UNION,UNIQUE,UNKNOWN,UNNAMED,UNNEST,UPDATE,UPPER,UPSERT,USAGE,USER,USER_DEFINED_TYPE_CATALOG,USER_DEFINED_TYPE_CODE,USER_DEFINED_TYPE_NAME,USER_DEFINED_TYPE_SCHEMA,USING,VALUE,VALUES,VARBINARY,VARCHAR,VARYING,VAR_POP,VAR_SAMP,VERSION,VIEW,WEEK,WHEN,WHENEVER,WHERE,WIDTH_BUCKET,WINDOW,WITH,WITHIN,WITHOUT,WORK,WRAPPER,WRITE,XML,YEAR,ZONE