近期遇到一個需求,原本我們在Elasticsearch的mapping中並沒有加入created_time,這種紀錄doc第一次寫入的field,原因是我們的資料會持續不斷的送進Elasticsearch,且資料量非常龐大,必須採用bulk update的upsert方式來做持續寫入或更新才能跟得上資料成長速度,我們不知道進來的資料是已經存在Elasticsearch或是沒有,所以每一次bulk的資料都是完整的所有fields,如果field內容一樣就不被更新,如果是新資料就被index。
但是created_time這種欄位只能紀錄doc第一次進來被index的時間,如果用上述進資料會給所有fields的方式的話,created_time就會一直被更新,而無法紀錄第一次進來的時間,等於updated_time。
當然有另一種方式是資料先進行查詢,看看有沒有這筆doc在看要送什麼field到bulk的action,但是這樣會增加非常大的寫入時間成本。
下面介紹的方式也會增加寫入時間成本,但會比先查詢是否有這筆資料來的快許多(雖然還是很慢),我們會用Elasticsearch的script和store script功能。
以下的範例請注意我使用的是Elasticsearch 5.4
版,其他的版本請根據該版本document更新相關用法。
最初版bulk action
這是最原始純用bulk與action包含script的方式建立created_time,意思是upsert一筆資料後,再用script去看看它有沒有created_time field,沒有的話新增,有的話不做動作。
from datetime import datetime from elasticsearch import (Elasticsearch, helpers) es = Elasticsearch('localhost:9200', timeout=60) actions = [ { '_op_type': 'update', 'doc_as_upsert': True, '_index': 'myindex', '_type': 'mytype', '_id': 'test1', 'doc': { "field1": '1 2 3 4 5 6', "field2": '4 5 6 7 8' } }, { '_op_type': 'update', '_index': 'myindex', '_type': 'mytype', '_id': 'test1', "script": { "inline": "if( !ctx._source.containsKey(\"created_time\") ){ ctx._source.created_time = params.created_time; }", "params": { "created_time": datetime.now().strftime('%Y-%m-%d %H:%M') } }, } ] print(helpers.bulk(es, actions, stats_only=True))
在網路上其他建議中,可以把script放到Elasticsearch的cache中,會加快一點速度與讓request簡便一點。
Elasticsearch stored script
POST _scripts/set_created_time { "script": { "lang": "painless", "code": "if( !ctx._source.containsKey(\"created_time\") ){ ctx._source.created_time = params.created_time; }" } }
修改bulk action
from datetime import datetime from elasticsearch import (Elasticsearch, helpers) es = Elasticsearch('localhost:9200', timeout=60) actions = [ { '_op_type': 'update', 'doc_as_upsert': True, '_index': 'myindex', '_type': 'mytype', '_id': 'test1', 'doc': { "field1": '1 2 3 4 5 6', "field2": '4 5 6 7 8' } }, { '_op_type': 'update', '_index': 'myindex', '_type': 'mytype', '_id': 'test1', "script": { "stored": "set_created_time", "params": { "created_time": datetime.now().strftime('%Y-%m-%d %H:%M') } }, } ] print(helpers.bulk(es, actions, stats_only=True))
改變的地方很簡單,script的部分換成呼叫stored script的id而已。
Conclusion
我們的資料量大約200-400/s,docker swarm分出10 replicas分散式寫入,Elasticsearch Cluster data node 12台,以上述的介紹方式還是跟不上需要的寫入速度,所以最後還是用其他方式解需求。
不過也許對其他情境會是一個可行的做法,分享給大家。
References
- How to use scripts, https://www.elastic.co/guide/en/elasticsearch/reference/5.4/modules-scripting-using.html
- Elasticsearch specification, https://gist.github.com/ashishtiwari1993/004a19f4a44efc214403a7fc1ee27cda#challenge-1-