Elasticsearch, How not to update specific field with frequently updated documents, using python、bulk、upsert and ES script

0
50

近期遇到一個需求,原本我們在Elasticsearch的mapping中並沒有加入created_time,這種紀錄doc第一次寫入的field,原因是我們的資料會持續不斷的送進Elasticsearch,且資料量非常龐大,必須採用bulk update的upsert方式來做持續寫入或更新才能跟得上資料成長速度,我們不知道進來的資料是已經存在Elasticsearch或是沒有,所以每一次bulk的資料都是完整的所有fields,如果field內容一樣就不被更新,如果是新資料就被index。

但是created_time這種欄位只能紀錄doc第一次進來被index的時間,如果用上述進資料會給所有fields的方式的話,created_time就會一直被更新,而無法紀錄第一次進來的時間,等於updated_time。

當然有另一種方式是資料先進行查詢,看看有沒有這筆doc在看要送什麼field到bulk的action,但是這樣會增加非常大的寫入時間成本。

下面介紹的方式也會增加寫入時間成本,但會比先查詢是否有這筆資料來的快許多(雖然還是很慢),我們會用Elasticsearch的script和store script功能。

以下的範例請注意我使用的是Elasticsearch 5.4版,其他的版本請根據該版本document更新相關用法。

最初版bulk action

這是最原始純用bulk與action包含script的方式建立created_time,意思是upsert一筆資料後,再用script去看看它有沒有created_time field,沒有的話新增,有的話不做動作。

from datetime import datetime
from elasticsearch import (Elasticsearch, helpers)

es = Elasticsearch('localhost:9200', timeout=60)

actions = [
    {
        '_op_type': 'update',
        'doc_as_upsert': True,
        '_index': 'myindex',
        '_type': 'mytype',
        '_id': 'test1',
        'doc': {
            "field1": '1 2 3 4 5 6',
            "field2": '4 5 6 7 8'
        }
    },
    {
        '_op_type': 'update',
        '_index': 'myindex',
        '_type': 'mytype',
        '_id': 'test1',
        "script": {
            "inline": "if( !ctx._source.containsKey(\"created_time\") ){ ctx._source.created_time = params.created_time; }",
            "params": {
                "created_time": datetime.now().strftime('%Y-%m-%d %H:%M')
            }
        },
    }
]

print(helpers.bulk(es, actions, stats_only=True))

在網路上其他建議中,可以把script放到Elasticsearch的cache中,會加快一點速度與讓request簡便一點。

Elasticsearch stored script

POST _scripts/set_created_time
{
	"script": {
		"lang": "painless",
		"code": "if( !ctx._source.containsKey(\"created_time\") ){ ctx._source.created_time = params.created_time; }"
	}
}

修改bulk action

from datetime import datetime

from elasticsearch import (Elasticsearch, helpers)

es = Elasticsearch('localhost:9200', timeout=60)

actions = [
    {
        '_op_type': 'update',
        'doc_as_upsert': True,
        '_index': 'myindex',
        '_type': 'mytype',
        '_id': 'test1',
        'doc': {
            "field1": '1 2 3 4 5 6',
            "field2": '4 5 6 7 8'
        }
    },
    {
        '_op_type': 'update',
        '_index': 'myindex',
        '_type': 'mytype',
        '_id': 'test1',
        "script": {
            "stored": "set_created_time",
            "params": {
                "created_time": datetime.now().strftime('%Y-%m-%d %H:%M')
            }
        },
    }
]

print(helpers.bulk(es, actions, stats_only=True))

改變的地方很簡單,script的部分換成呼叫stored script的id而已。

Conclusion

我們的資料量大約200-400/s,docker swarm分出10 replicas分散式寫入,Elasticsearch Cluster data node 12台,以上述的介紹方式還是跟不上需要的寫入速度,所以最後還是用其他方式解需求。

不過也許對其他情境會是一個可行的做法,分享給大家。

References

  1. How to use scripts, https://www.elastic.co/guide/en/elasticsearch/reference/5.4/modules-scripting-using.html
  2. Elasticsearch specification, https://gist.github.com/ashishtiwari1993/004a19f4a44efc214403a7fc1ee27cda#challenge-1-
Subscribe
Notify of
guest
0 Comments
Inline Feedbacks
View all comments