Adding ElasticSearch To Legacy Application Using Logstash

Let’s add some twist to the above use case. Say suppose we wish to index order details of each user along with user details in the same document.

  1. Create an order table in MySQL DB.
CREATE TABLE orders (
orderid INT(6) AUTO_INCREMENT PRIMARY KEY,
product VARCHAR(300) NOT NULL,
description VARCHAR(300) NOT NULL,
price int(6),
customerid int(6),
ordertime TIMESTAMP,
FOREIGN KEY fk_userid(customerid)
REFERENCES customer(id)
)
INSERT INTO `ecomdb`.`orders` (`orderid`, `product`, `description`, `price`, `customerid`,`ordertime`)
VALUES (1, 'Tennis Ball', 'Wilson Australian Open', '330', '5','2019-01-22 20:21:49');
INSERT INTO `ecomdb`.`orders` (`orderid`, `product`, `description`, `price`, `customerid`,`ordertime`)
VALUES (2, 'Head Xtra Damp Vibration Dampner', 'Dampens string vibration to reduce the risk of pain', '500', '4','2019-01-23 02:21:49');
INSERT INTO `ecomdb`.`orders` (`orderid`, `product`, `description`, `price`, `customerid`,`ordertime`)
VALUES (3, 'HEAD Wristband Tennis 2.5" (White)', '80 % Cotton, 15% Nylon, 5 % Rubber (Elasthan)', '530', '3','2019-01-21 21:21:49');
INSERT INTO `ecomdb`.`orders` (`orderid`, `product`, `description`, `price`, `customerid`,`ordertime`)
VALUES (4, 'YONEX VCORE Duel G 97 Alfa (290 g)', 'Head Size 97', '4780', '2','2019-01-22 14:21:49');
INSERT INTO `ecomdb`.`orders` (`orderid`, `product`, `description`, `price`, `customerid`,`ordertime`)
VALUES (5, 'Wilson Kaos Stroke - White & Black', 'Wilson Australian Open', '9000', '1','2019-01-25 03:53:49');

Now once we are ready with database and as we wish to index order details in the same document as a nested JSON object along with user details we will make use of Filter plugin provided by Logstash. There are various plugins supported by Logstash and we can choose one according to our need. We are going to use Ruby Filter. With the ruby filter, we can execute any random ruby code.

The question that might have popped up in our mind is

  • How will we get data from two different tables using JDBC input plugin? We will be using join query.
select c.id as customerid,c.firstname ,c.lastname ,c.email, c.regdate ,
od.orderid ,od.product ,od.description , od.price ,od.ordertime
from customer as c left join orders as od on c.id = od.customerid;

2. Write Ruby code as per our requirement.

Ruby code for manipulating our document goes as below

# the filter method receives an event and must return a list of events.
# Dropping an event means not including it in the return array,
# while creating new ones only requires you to add a new instance of
# LogStash::Event to the returned array
def filter(event)
orderid =event.get("orderid")
product = event.get("product")
description = event.get("description")
price = event.get("price")
ordertime = event.get("ordertime")
orderDetails ={
"orderid" => orderid,
"product" => product,
"description" => description,
"price" => price,
"ordertime" => ordertime
}
event.set('orderDetails',orderDetails)
event.remove('orderid')
event.remove('product')
event.remove('description')
event.remove('price')
event.remove('ordertime')
return [event]
end

Name the ruby file as sampleRuby.rb. Ruby filter has a mandatory filter method which accepts a Logstash event and must return an array of events. In the above code, we have manipulated the event by creating a hash of order details and set that hash as a new field in the event. We have also removed the fields which are not required after the order details hash being added.

3. Adding the ruby filter to the logstash configuration file.

The new version of the logstash-sample.conf file will look as follows

input {
jdbc {
jdbc_driver_library => "mysql-connector-java-5.1.39.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://localhost:3306/ecomdb"
jdbc_user =>
jdbc_password =>
tracking_column => "regdate"
use_column_value=>true
statement => "select c.id as customerid,c.firstname ,c.lastname ,c.email, c.regdate ,od.orderid ,od.product ,od.description , od.price ,od.ordertime from customer as c left join orders as od on c.id = od.customerid where c.regdate>:sql_last_value;"
schedule => " * * * * * *"
}
}
filter{
ruby{
path¹ => 'sampleRuby.rb'
}
}
output {
elasticsearch {
document_id=> "%{customerid}"
document_type => "doc"
index => "test"
hosts => ["http://localhost:9200"]
}
stdout{
codec => rubydebug
}
}

¹ The path of the ruby script file that implements the filter method. Location of the ruby file should be same as that of logstash-sample.conf

4. Running the above config file using below command

logstash -f logstash-sample.conf

The output of the above script will be as follows

{
"took": 0,
"timed_out": false,
"_shards": {
"total": 5,
"successful": 5,
"skipped": 0,
"failed": 0
},
"hits": {
"total": 5,
"max_score": 1,
"hits": [
{
"_index": "test",
"_type": "doc",
"_id": "5",
"_score": 1,
"_source": {
"orderDetails": {
"orderid": 1,
"description": "Wilson Australian Open",
"product": "Tennis Ball",
"ordertime": "2019-01-22T14:51:49.000Z",
"price": 330
},

"@version": "1",
"email": "[email protected]",
"@timestamp": "2019-02-02T14:13:46.754Z",
"regdate": "2019-01-23T16:51:49.000Z",
"firstname": "Jimmy",
"customerid": 5,
"lastname": "Connors"
}
}
]
}
}

As highlighted above we can see we have added a nested JSON to our existing document.

Logstash plugin can serve the purpose of migrating our legacy systems to ElasticSearch. In this way, we have migrated our search part of our application to a search engine instead of using search features provided by our datastore. We are keeping our source of truth in the SQL database but you could also imagine migrating from the legacy datastore to a NoSQL.

Note: JDBC input plugin is not able to track the delete events(hard delete) on your database. You may consider modifiying your database table with a isdeleted flag and use that column as a tracking column.

read original article here