Skip to content
Open
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ Although there are some other MySQL rivers for Elasticsearch, like [elasticsearc
## Todo

+ MySQL 8
+ ES 6
+ ES 6 (After verification (version 6.4.2), it is now supported. Delete and update are supported)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Em, are you sure we can support ES 6 directly? Seem ES 6 has already removed doc Type?

+ Statistic.

## Donate
Expand Down
133 changes: 50 additions & 83 deletions etc/river.toml
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
# MySQL address, user and password
# user must have replication privilege in MySQL.
my_addr = "127.0.0.1:3306"
my_addr = "mysqlhost:3306"
my_user = "root"
my_pass = ""
my_pass = "root"
my_charset = "utf8"

# Set true when elasticsearch use https
#es_https = false
# Elasticsearch address
es_addr = "127.0.0.1:9200"
es_addr = "eshost:9200"
# Elasticsearch user and password, maybe set by shield, nginx, or x-pack
es_user = ""
es_pass = ""
#es_user = ""
#es_pass = ""

# Path to store data, like master.info, if not set or empty,
# we must use this to support breakpoint resume syncing.
Expand All @@ -25,7 +25,7 @@ stat_addr = "127.0.0.1:12800"
server_id = 1001

# mysql or mariadb
flavor = "mysql"
flavor = "mariadb"

# mysqldump execution path
# if not set or empty, ignore mysqldump.
Expand All @@ -46,13 +46,13 @@ skip_no_pk_table = false

# MySQL data source
[[source]]
schema = "test"
schema = "nfvofcaps"

# Only below tables will be synced into Elasticsearch.
# "t_[0-9]{4}" is a wildcard table format, you can use it if you have many sub tables, like table_0000 - table_1023
# I don't think it is necessary to sync all tables in a database.
tables = ["t", "t_[0-9]{4}", "tfield", "tfilter"]

#tables = ["t", "t_[0-9]{4}", "tfield", "tfilter"]
tables = ["FMALARM"]
# Below is for special rule mapping

# Very simple example
Expand All @@ -67,82 +67,49 @@ tables = ["t", "t_[0-9]{4}", "tfield", "tfilter"]
#
# The table `t` will be synced to ES index `test` and type `t`.
[[rule]]
schema = "test"
table = "t"
index = "test"
type = "t"

# Wildcard table rule, the wildcard table must be in source tables
# All tables which match the wildcard format will be synced to ES index `test` and type `t`.
# In this example, all tables must have same schema with above table `t`;
[[rule]]
schema = "test"
table = "t_[0-9]{4}"
index = "test"
type = "t"

# Simple field rule
#
# desc tfield;
# +----------+--------------+------+-----+---------+-------+
# | Field | Type | Null | Key | Default | Extra |
# +----------+--------------+------+-----+---------+-------+
# | id | int(11) | NO | PRI | NULL | |
# | tags | varchar(256) | YES | | NULL | |
# | keywords | varchar(256) | YES | | NULL | |
# +----------+--------------+------+-----+---------+-------+
#
[[rule]]
schema = "test"
table = "tfield"
index = "test"
type = "tfield"
schema = "nfvofcaps"
table = "FMALARM"
index = "nfvomysql"
type = "mysqltable"

# The es doc's id will be `id`:`tag`
# It is useful for merge muliple table into one type while theses tables have same PK
id = ["ID","ALARMID"]


[rule.field]
# Map column `id` to ES field `es_id`
id="es_id"
#id="es_id"
# Map column `tags` to ES field `es_tags` with array type
tags="es_tags,list"
#tags="es_tags,list"
# Map column `keywords` to ES with array type
keywords=",list"

# Filter rule
#
# desc tfilter;
# +-------+--------------+------+-----+---------+-------+
# | Field | Type | Null | Key | Default | Extra |
# +-------+--------------+------+-----+---------+-------+
# | id | int(11) | NO | PRI | NULL | |
# | c1 | int(11) | YES | | 0 | |
# | c2 | int(11) | YES | | 0 | |
# | name | varchar(256) | YES | | NULL | |
# +-------+--------------+------+-----+---------+-------+
#
[[rule]]
schema = "test"
table = "tfilter"
index = "test"
type = "tfilter"

# Only sync following columns
filter = ["id", "name"]

# id rule
#
# desc tid_[0-9]{4};
# +----------+--------------+------+-----+---------+-------+
# | Field | Type | Null | Key | Default | Extra |
# +----------+--------------+------+-----+---------+-------+
# | id | int(11) | NO | PRI | NULL | |
# | tag | varchar(256) | YES | | NULL | |
# | desc | varchar(256) | YES | | NULL | |
# +----------+--------------+------+-----+---------+-------+
#
[[rule]]
schema = "test"
table = "tid_[0-9]{4}"
index = "test"
type = "t"
# The es doc's id will be `id`:`tag`
# It is useful for merge muliple table into one type while theses tables have same PK
id = ["id", "tag"]
#keywords=",list"


ORIGIN="origin"
ID="id"
ALARMID="alarmid"
ALARMTITLE="alarmtitle"
ALARMSTATUS="alarmstatus"
ORIGSEVERITY="origseverity"
ALARMTYPE="alarmtype"
EVENTTIME="eventtime,date"
MSGSEQ="msgseq"
CLEARTIME="cleartime,date"
CLEARFLAG="clearflag"
CLEARMSGSEQ="clearmsgseq"
SPECIFICPROBLEMID="specificproblemid"
SPECIFICPROBLEM="specificproblem"
NEUID="neuid"
NENAME="nename"
NETYPE="netype"
OBJECTUID="objectuid"
OBJECTNAME="objectname"
OBJECTTYPE="objecttype"
LOCATIONINFO="locationinfo"
ADDINFO="addinfo"
PVFLAG="pvflag"
CONFIRMFLAG="confirmflag"
CONFIRMTIME="confirmtime,date"
REMARK="remark"
REMARKTIME="remarktime,date"
10 changes: 9 additions & 1 deletion river/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -514,7 +514,15 @@ func (r *River) getFieldValue(col *schema.TableColumn, fieldType string, value i
case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64:
fieldValue = r.makeReqColumnData(col, time.Unix(v.Int(), 0).Format(mysql.TimeFormat))
}
}
} else if col.Type == schema.TYPE_STRING {
v := r.makeReqColumnData(col, value)
str, _ := v.(string)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

switch v.(type) {
    case string:
    case []byte:
    default:
}

I think you can check string and []byte here.

stamp, _ := time.ParseInLocation(time.RFC3339 , str, time.Local)
t := int64(stamp.Unix())
col.Type = schema.TYPE_DATETIME
fieldValue = r.makeReqColumnData(col, time.Unix(t, 0).Format(mysql.TimeFormat))

}
}

if fieldValue == nil {
Expand Down