본문 바로가기
데이터 엔지니어( 실습 정리 )/elasticsearch

4. 데이터 다루기 (1, 2)

by 세용용용용 2024. 12. 14.

[ 엘라스틱 서치 바이블 ] 공부한 후 정리한 내용 입니다!!!

 

 

1. 단건 문서 API

  • 인덱스에 문서 ( 색인, 조회, 업데이트, 삭제 ) 하는 api

 

(1-1) 색인 api

### 색인 api ( 매서드 : put, post )
### _doc, _create 지정해 호출
curl -XPUT "http://192.168.219.20:9200/[인덱스명]/_doc/[_id 값]"
curl -XPOST "http://192.168.219.20:9200/[인덱스명]/_doc"
>>> _doc 대신 _create 사용 가능 
>>> _create 사용시 항상 새 문서를 색인하는 경우만 허용 ( 덮어쓰기 금지 )

### _create 지정한 색인
curl -XPUT "http://192.168.219.20:9200/sy_index/_create/1?pretty" -H "Content-Type: application/json" -d \
'{
  "title": "hello jiwon elasticsearch"
}'

_create 지정한 호출 실패

  • _create 지정한 색인은 덮어쓰기 금지!! ( already exists 이미 존재함 ㅋㅋ )

 

 

라우팅

  • 라우팅 미 지정시 _id 해시값 기반 샤드가 배정
### 라우팅 지정 ( routing=[라우팅 명] )
curl -XPUT "http://192.168.219.20:9200/sy_index/_doc/2?routing=sy0218" -H "Content-Type: application/json" -d \
'{
  "login_id": "sy0218",
  "title": "jiwon love",
  "created_ad": "2024-12-14"
}'

>>> 조회 : curl -XGET "http://192.168.219.20:9200/sy_index/_doc/2?pretty"

라우팅 지정한 문서 조회 결과

  • 라우팅 지정시 _routing 필드에 라우티명 확인 가능

 

 

refresh

  • 색인시 refresh 매개변수 지정하면 문서 색인 후 해당 샤드를 refresh 해서 즉시 검색 가능하게 만들 것인지 지정 가능

refresh 매개 변수

  • 하지만 refresh 매개변수 지정시 고려 필요..
  • 매번 refresh 호출시 클러스터 성능 저하..
  • 너무 많은 작은 세그먼트 생성 ( 색인 이후 검색 성능 떨어짐, 추후 세그먼트 병합 과정도 부담이 커짐 )
  • wait_for 기본값은 1000 ( 초당 1000건 이상 색인시 강제 refresh 수행 )
  • 대량 색인 필요시 단건 색인이 아닌 bulk api 사용해야됨

 

(1-2) 조회 api

  • 문서 단건 조회, refresh 되지 않은 상태서 변경된 내용 확인 가능
  • 인덱스 명 + _id값 을 명시하고 호출
  • 조회 api에서 라우팅 사용시 색인했을 떄 라우팅 명과 동일하게 지정 해줘야됨
### 조회 api
curl -XGET "http://192.168.219.20:9200/sy_index/_doc/2?pretty"

>>> _doc는 문서 메타데이터도 함께 조회
>>> _source는 문서 본문만 조회

 

 

필드 필터링

  • 조회 api 사용시 ( _source_includes, _source_excludes ) 옵션 사용해 원하는 필드만 필터 가능
  • 와일드 카드 사용 가능
### 필드 필터링 테스트
curl -XGET "http://192.168.219.20:9200/sy_index/_source/2?pretty&_source_includes=login_id,c*,t*&_source_excludes=created_ad"

>>> _source_incldes : 포함 필터
>>> _source_excludes : 제외 필터

 

 

(1-3) 업데이트 api

  • 지정 문서 업데이트, 요청 본문에 ( doc, script ) 지정해 업데이트 내용 기술
  • es 업데이트는 기존 문서 수정하는 것이 아님!!
  • 문서 조회 내용 + 업데이트 내용 합쳐 새 문서를 색인하는 형태
  • 즉, _source를 비활성화 시킨경우 업데이트 api 사용 불가!!
  • 업데이트도 라우팅과 refresh 옵션 지정가능 ( 라우팅 색인 시와 동일, refresh는 성능 고려 )
### doc 업데이트
curl -XPOST "http://192.168.219.20:9200/sy_index/_update/2" -H "Content-Type: application/json" -d \
'{
  "doc": {"login_id": "jiwon0725"}
}'

 

 

detect_noop

  • es는 업데이터 작업 전 noop( 기존 변경이 없음 ) 인지 확인, noop이면 쓰기 작업 수행 안함
  • noop 검사를 통해 불필요한 디스크 I/O 줄임

업데이트 변경이 없을 경우 쓰기 작업 안함

 

 

doc_as_upsert

  • es 업데이트는 기존 문서가 없으면 요청이 실패
  • 기존 문서 없을 경우 새로 문서를 추가하는 기능
### doc_as_upsert 테스트
curl -XPOST "http://192.168.219.20:9200/sy_index/_update/4?pretty" -H "Content-Type: application/json" -d \
'{
  "doc": {"view": 27}, "doc_as_upsert": true
}'

 

 

script를 이용 업데이트

### script 업데이트
curl -XPOST "http://192.168.219.20:9200/sy_index/_update/4?pretty" -H "Content-Type: application/json" -d \
'{
  "script": {
    "source": "ctx._source.view += params.amount",
    "lang": "painless",
    "params": {"amount": 10}
  },
  "scripted_upsert": false
}'

>> script : 스크립트 사용 업데이트 시 script 필드안에 기술
>> source : 스크립트 본문이 기술
>> lang : 스크립트 언어 종류 지정
>> params : 스크립트 본문에 사용할 매개변수
>> scripted_upsert : 업데이트시 upsert 동작 여부 지정
  • 스크립트 사용 업데이트는 ctx 변수를 통해 문서 내용이나 메타데이터 접근 가능

 

(1-4) 삭제 api

  • 문서를 삭제 ( DELETE 메서드 사용 )
  • 삭제 api도 라우팅, refresh 지정 가능
### es 문서 삭제
curl -XDELETE "http://192.168.219.20:9200/sy_index/_doc/4?pretty"

 

 

2. 복수 문서 api

  • 한건 단위로 http에 실어 나르는 것은 오버헤드가 큼..
  • 서비스 환경에서 단건 문서 api 보단 복수 문서 api를 활용 ( 성능 차이 큼 )

 

(2-1) bulk API

  • 여러 색인, 업데이트, 삭제 작업을 한번에 보내는 api
  • 본문 요청 json이 아닌 ndjson 형태로 보냄 ( json >>> x-ndjson )
### bulk api 테스트
curl -XPOST "http://192.168.219.20:9200/_bulk?pretty" -H "Content-Type: application/x-ndjson" -d \
'
{"index": {"_index": "bulk_test", "_id": "1"}}
{"field1": "value1"}
{"delete": {"_index": "bulk_test", "_id": "2"}}
{"create": {"_index": "bulk_test", "_id": "3"}}
{"field1": "value3"}
{"update": {"_index": "bulk_test", "_id": "1"}}
{"doc": {"field2": "value2"}}
{"index": {"_index": "bulk_test", "_id": "4", "routing": "a"}}
{"field1": "value4"}
'

>>> 요청 종류, 인덱스, _id 라우팅 등을 기재
>>> 문서 내용이 필요한 요청의 경우 해당 내용 다음 줄에 적으면 됨
>>> 즉, 요청 하나의 크기는 ( 1줄 ~ 2줄 )


>>> api 요청시 ( [인덱스 명]/_bulk ) 인덱스 지정시 기본 인덱스로 지정됨
>>> index : 색인 작업 요청
>>> create : 색인 작업 요청 이지만 문서 존재시 작업 하지 않음
>>> update : 업데이트 요청
>>> delete : 삭제 요청

 

 

bulk API 작업 순서

  • bulk api에 기술 순서로 작업된다는 보장은 없음
  • 조정 노드는 요청을 보고 적절한 주 샤드로 요청을 넘기며, 각 요청은 독자적으로 수행 ( 즉, 순서 보장 안됨 )
  • 하지만 동일 인덱스, _id, 라우팅 조합은 동일 샤드로 넘어가기에 기술된 순서로 동작

 

(2-2) multi get API

  • 여러 문서를 한번에 조회하는 api
  • 단건 조회 api 반복 사용하는 것보다 성능이 좋음
### multi get api 테스트
curl -XGET "http://192.168.219.20:9200/_mget?pretty" -H "Content-Type: application/json" -d \
'{
  "docs": [
    {"_index": "bulk_test", "_id": 1},
    {"_index": "bulk_test", "_id": 4, "routing": "a"},
    {"_index": "my_index2", "_id": 1, "_source": {"include": ["p*"], "exclude": ["point"]}}
  ]
}'

>>> [인덱스 명]/_mget 요청시 기본 인덱스 지정

### ids 만 기술해 요청 가능
curl -XGET "http://192.168.219.20:9200/bulk_test/_mget?pretty" -H "Content-Type: application/json" -d \
'{
  "ids": [1, 4]
}'

 

 

(2-3) update by query

  • 검색 쿼리를 통해 조건 만족하는 문서를 찾은뒤 해당 문서 대상으로 업데이트 실시하는 API
  • script를 통한 업데이트만 지원
### update by query 테스트
curl -XPOST "http://192.168.219.20:9200/bulk_test/_update_by_query?pretty" -H "Content-Type: application/json" -d \
'{
  "script": {
    "source": "ctx._source.field1 = ctx._source.field1 + \"-\" + ctx._id", "lang": "painless"
  },
  "query": {
    "exists": {"field": "field1"}
  }
}'
>>> query 조건에 맞는 문서를 스냅샷으로 찍고
>>> 지정된 스크립트에 맞게 업데이트 실시
>>> 순차적 업데이트중 다른 작업으로 인해 문서가 변경될시 update by query는 찍어 두었던 스냅샷을 통해
변화가 생긴 문서를 확인하고 이를 업데이트 하지 않음!!!
>>> conflicts 매개변수를 통해 충돌 발견 시 작업 중단 및 다음 작업으로 넘어 갈수 있음 ( abort : 중단, proceed : 작업 넘어감 )
>>> update by query 작업중간 중단되더라고 이전까지 작업은 업데이트된 상태로 남음

update_by_query 결과

  • 문서 갯수, 업데이트 수, 충돌, 재시도 등 필드를 확인 가능
  • throttled_millis, requests_per_second, throttled_until_millis 필드는 스로틀링을 위한 필드

 

스로틀링

  • update by query는 데이터를 일괄 수정 하는 작업에 많이 활용
  • 대량 작업 수행시 운영 중 서비스에 영향을 줄수 있는데 이런 상황을 피하기 위해 스로틀링 기능 사용
### 스로틀링 테스트
!! 스로틀링을 적용해 작업의 속도를 조절하고 클러스터 부하 최소화 가능 !!
curl -XPOST "http://192.168.219.20:9200/bulk_test/_update_by_query \ 
?scroll_size=1000&scroll=1m&requests_per_seconds=500" -H "Content-Type: application/json" -d \
'{
 ~~~
}'

>>> scroll_size=1000
- 업데이트 전 검색을 수행해 문서 1000개를 가져오고 업데이트 수행
- 작업 단위 1000개 단위로 반복

>>> scorll=1m
- 검색 수행시 당시 상태를 검색 문맥에 보존 ( 스냅샷 찍는거 )
- 검색 문색을 얼마나 보존할지 지정
- 1m은 1분 동안 검색 문맥 유지
- scroll_size 작업 시간 만큼 시간을 지정하면됨
- 너무 크면 오버헤드 증가

>>> requests_per_second=500 ( -1은 스로틀링 적용 안함 )
- 스로틀링 실제 적용을 위해 설정 해줘야 하는 옵션
- 초 당 몇개 작업을 수행할 것 인지
- requests_per_second와 scroll_size 값을 조절하는 것이 스로틀링 조정 핵심!!
  • 실제 서비스 운영 중 대량의 update by query 작업시 스로틀링을 적용해 클러스터 부담과 서비스 리스크를 줄이며 안전하게 작업 수행
  • 작업이 종료되는 시간이 오래 걸릴수 있어 tasks api를 사용해 update by query 비동기적 실행
  • 비동기적 실행 :  작업을 es가 백그라운드로 실행하고 task id를 응답으로 받음
  • task id를 통해 작업상태 추적 및 결과 확인 가능

 

 

비동기적 요청과 tasks api

  • 엘라스틱서치 update by query 요청 시 wait_for_completion 매개변수 false로 지정시 비동기적 처리 가능
  • 비동기 요청시 task id를 응답으로 받고 해당 id를 통해 tasks api를 호출함으로써 작업 진행이나 결과 및 취소 가능
### task 작업 등록과 상태 조회 테스트
curl -XPOST "http://192.168.219.20:9200/bulk_test/_update_by_query?pretty&wait_for_completion=false" -H "Content-Type: application/json" -d \
'{
  "script": {
    "source": "ctx._source.field1 = ctx._source.field1 + \"-\" + ctx._id", "lang": "painless"
  },
  "query": {
    "exists": {"field": "field1"}
  }
}'

task_id

 

### task id를 가지고 task 상태 확인
curl -XGET "http://192.168.219.20:9200/.tasks/_doc/O9TSjAv2QfOeLqOunfCD0g:6135?pretty"

>>> 작업 상태 및 결과 확인 가능
>>> 비동기적 실행시 tasks api를 호출하여 작업을 관리


### task 작업 취소
curl -XPOST "http://192.168.219.20:9200/_tasks/O9TSjAv2QfOeLqOunfCD0g:6135/_cancel?pretty"

>>> 작업 진행 중 문제 발생시 작업을 취소
>>> task id를 알아낸 후 작업 취소 요청 가능

 

 

스로틀링 동적 변경

  • 스로틀링 옵션을 동적 변경 가능 ( 유연한 대처를 위해 )
### 스로틀링 동적 변경 테스트
curl -XPOST "http://192.168.219.20:9200/_update_by_query/[task id]/_rethrottle?requests_per_second=[변경 값]"

>>> task id를 입력 후 스로틀링 변경값 기재

 

 

task 결과 삭제

  • .tasks 인덱스에 등록된 작업이 성공, 취소 여부와 상관 없이 등록된 작업 결과는 es에 계속 남음
  • 그렇기에 작업 상황이 충분이 이해 됬으면 .tasks 인덱스에 문서를 삭제
### task 결과 삭제
curl -XDELETE "http://192.168.219.20:9200/.tasks/_doc/O9TSjAv2QfOeLqOunfCD0g:582?pretty"

 

 

슬라이싱

  • 업데이트 성능을 최대로 끌어내 빠른 시간 안에 끝내는 작업시 필요한 방법
  • slices 매개변수로 업데이트를 지정한 개수로 쪼개 병렬적 수행
### 슬라이싱 테스트
!! 기본값은 1, auto는 엘라스틱서치가 적절한 개수를 지정해 작업을 병렬 수행 ( 보통 인덱스 주 샤드 수가 슬라이스 수 )
!! auto가 아닌 숫자도 지정 가능 !!
!! 슬라이스가 주 샤드 수보다 많을시 한 샤드 내에 데이터를 여러 슬라이스에 분배 해야되는데 해당 과정에서 오버헤드 발생 !!
curl -XPOST "http://192.168.219.20:9200/bulk_test/_update_by_query?pretty&slices=auto" -H "Content-Type: application/json" -d \
'{
  "script": {
    "source": "ctx._source.field1 = ctx._source.field1 + \"-\" + ctx._id", "lang": "painless"
  },
  "query": {
    "exists": {"field": "field1"}
  }
}'

 

 

(2-4) delete by query

  • update by query와 동일하게 검색 쿼리로 삭제 대상 지정 뒤 삭제 수행
  • update by query와 동일하게 스냅샷을 찍음
  • conflicts, wait_for_completion, task api 관리, 스로틀링, 슬라이싱 관련 내용도 update by query와 동일
  • 오래된 데이터, 더 이상 사용하지 않는 데이터를 삭제하는 경우에 많이 사용 ( 배치성 작업 많음 )
### delete by query
curl -XPOST "http://192.168.219.20:9200/bulk_test/_delete_by_query?pretty" -H "Content-Type: application/json" -d \
'{
  "query": {
    "wildcard": {
      "field1": {"value": "*value*"}
    }
  }
}'

>>> field1 필드의 값에 value가 포함된 문서를 삭제하는 delete by query

delete by query 실행 결과

  • 총 3개의 문서가 삭제 된것을 확인