킹장
471
2021-01-29 14:18:38 작성 2021-01-29 14:19:31 수정됨
6
909

elasitcsearch 개발 중에 logstash 인덱싱 중 오류가 발생하여 질문드립니다.



 Failed to execute action {:action=>LogStash::PipelineAction::Create/pipeline_id:main, :exception=>"LogStash::ConfigurationError", :message=>"Expected one of #, input, filter, output at line 215, column 1 (byte 8285) after ", :backtrace=>["/logstash-7.1.1/logstash-core/lib/logstash/compiler.rb:41:in `compile_imperative'", "/logstash-7.1.1/logstash-core/lib/logstash/compiler.rb:49:in `compile_graph'", "/logstash-7.1.1/logstash-core/lib/logstash/compiler.rb:11:in `block in compile_sources'", "org/jruby/RubyArray.java:2577:in `map'", "/logstash-7.1.1/logstash-core/lib/logstash/compiler.rb:10:in `compile_sources'", "org/logstash/execution/AbstractPipelineExt.java:151:in `initialize'", "org/logstash/execution/JavaBasePipelineExt.java:47:in `initialize'", "/logstash-7.1.1/logstash-core/lib/logstash/java_pipeline.rb:23:in `initialize'", "/logstash-7.1.1/logstash-core/lib/logstash/pipeline_action/create.rb:36:in `execute'", "/logstash-7.1.1/logstash-core/lib/logstash/agent.rb:325:in `block in converge_state'"]}
  
이러한 오류가 발견됬고 제 config 파일 보여드립니다. 
어디 부분이 오류인줄 잘모르겠습니다 조언 부탁드립니다 선배님들  
    209                         timeout_code => "
    210                         elapsed_time = Time.now - event.timestamp.time
    211                         event.tag('last') if elapsed_time >= 3
    212                         "
    213 }
    214 }
    215 date{
    216 match => ["setdt","yyyyMMdd'T'HHmmss"]
    217 timezone => "Asia/Seoul"
    218 target => "setdt"
    219 }
    220 mutate {
    221
    222         add_field => {"msg_id" => "%{bid}%{spoid}"}
    223
    224 }
    225
    226 output {
    227         elasticsearch {
    228                 hosts => ["search-everon-jkgsx53bmgr3lhvcwpgylxnd7e.ap-northeast-2.es.amazonaws.com:443"]
    229                 ssl => true
    230                 index => "everon_spot"
    231                 ilm_enabled => false
    232                 document_id => "%{msg_id}"
    233
    234
    235         }
    236                 stdout{codec => json}
    237 }
    238

0
  • 답변 6

  • occidere
    32
    2021-01-29 15:15:21

    에러 내용에 답이 나와있네요


    :message=>"Expected one of #, input, filter, output at line 215, column 1 (byte 8285) after "


    이미 아시겠지만 Logstash 파이프라인은 1개의 input, output 과 0개 이상의 filter 로 이뤄져 있습니다.

    올려주신 코드에선 filter 와 output 은 확인되나 input 부분이 누락되어 있는것 같습니다.


    혹시 input 이 존재한다면, 전체 코드 (209줄 이전부분도 포함) 을 올려주시면 더 도움이 될 것 같습니다

  • 킹장
    471
    2021-01-29 15:20:41


     1 # Sample Logstash configuration for creating a simple
          2
          3
          4
          5
          6 # Beats -> Logstash -> Elasticsearch pipeline.
          7
          8 input {
          9
         10 jdbc {
         11  jdbc_validate_connection => true
         12  jdbc_driver_library => "/logstash-7.1.1/lib/ojdbc8.jar"
         13  jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver"
         14  jdbc_connection_string => "DB 주소 "
         15  jdbc_user => "유저"
         16  jdbc_password => "비번"
         17  jdbc_paging_enabled => true
         18  schedule => "*/1 * * * * Asia/Seoul"
         19
         20  statement => "SELECT
         21             NVL(UPDAT,SYSDATE) AS UPDAT,
         22             'EV' AS BID,
         23             B.FREE,
         24             B.KIND,
         25             B.SPOID,
         26             B.SPONAM,
         27             B.PLANAM,
         28             B.ADDCOD,
         29             B.ZIPADMSI,
         30             B.ZIPADMGU,
         31             B.ZIPCOD,
         32             B.ADD1,
         33             B.ADD2,
         34             B.PLADESC,
         35             TO_CHAR(B.LON) AS LON,
         36             TO_CHAR(B.LAT) AS LAT,
         37              B.DCCHACOD,
         38             CASE WHEN A.OKCNT > 0 THEN 'ONGREEN' WHEN A.USECNT = TOTALCNT THEN 'ONORANGE' ELSE 'ONGRAY' END AS STACOLOR,
         39             CASE WHEN A.OKCNT > 0 THEN OKCNT WHEN A.USECNT = TOTALCNT THEN A.USECNT ELSE A.NOCNT END AS CNT,
         40             CASE WHEN A.OKCNT > 0 THEN 0 WHEN A.USECNT = TOTALCNT THEN 1 ELSE 2 END AS ORD,
         41              B.SPOID AS ID,
         42             B.SPONAM AS TEXT
         43             ,TOTALCNT
         44            FROM
         45             (SELECT
         46         SPOID,
         47         SUM(CHAOK) OKCNT,
         48         SUM(CHAUSE) USECNT,
         49         SUM(CHANO) NOCNT,
         50         COUNT(SPOID) TOTALCNT
         51      FROM
         52         (SELECT
         53              A.SPOID
         54             , TO_NUMBER(TO_CHAR(SYSDATE, 'd')) DAYNUM
         55             , CASE WHEN (CASE WHEN (SYSDATE > NVL(D.COLLECT_DATE,TO_DATE('20180101','YYYYMMDD'))+1/24/60*30) THEN 'CHRS02' ELSE NVL(D.CHASTATUS,'CHRS01') END) IN ('CHRS03') THEN 1 ELSE 0 END         AS CHAOK
     168  jdbc_fetch_size => 1000
        169  use_column_value => true
        170  tracking_column => "updat"
        171  tracking_column_type => "timestamp"
        172  #last_run_metadata_path => "/logstash/.logstash_jdbc_last_run"
        173  record_last_run => true
        174  clean_run => true
        175
        176   }
        177 }
        178 filter {
        179         aggregate{
        180                 task_id => "%{spoid}_%{bid}"
        181                 code =>"
        182                         map['spoid'] =  event.get('spoid')
        183                         map['bid'] = event.get('bid')
        184                         map['updat'] = event.get('updat')
        185                         map['free'] = event.get('free')
        186                         map['kind'] = event.get('kind')
        187                         map['sponam'] = event.get('sponam')
        188                         map['planam'] = event.get('planam')
        189                         map['addcod'] = event.get('addcod')
        190                         map['zipadmsi'] = event.get('zipadmsi')
        191                         map['zipadmgu'] = event.get('zipadmgu')
        192                         map['zipcod'] = event.get('zipcod')
        193                         map['add1'] = event.get('add1')
        194                         map['add2'] = event.get('add2')
        195                         map['pladesc'] = event.get('pladesc')
        196                         map['lon'] = event.get('lon')
        197                         map['lat'] = event.get('lat')
        198                         map['stacolor'] = event.get('stacolor')
        199                         map['cnt'] = event.get('cnt')
        200                         map['ord'] = event.get('ord')
        201                         map['id'] = event.get('id')
        202                         map['text'] = event.get('text')
        203                         map['spotman'] ||= []
        204                         map['spotman'] << {'dcchacod' => event.get('dcchacod')}
        205                         "
        206                         timeout_tags => ["aggregate"]
        207                         push_previous_map_as_event => true
        208                         timeout => 3
        209                         timeout_code => "
        210                         elapsed_time = Time.now - event.timestamp.time
        211                         event.tag('last') if elapsed_time >= 3
        212                         "
        213 }
        214 }
        215 date{
        216 match => ["setdt","yyyyMMdd'T'HHmmss"]

    디비 정보와 아디 비번은 제외하고 다 보여드렸습니다 ㅠ
  • occidere
    32
    2021-01-29 15:32:32 작성 2021-01-29 15:35:48 수정됨

    date 필터와 mutate 필터를 각각 filter { ... } 로 둘러 주시거나, 위의 aggregate 가 포함된 filter { ... } 에 편입시켜 주시면 될것 같습니다.


    AS-IS

    filter {
        aggregate{
            task_id => "%{spoid}_%{bid}"
            code =>"
                map['spoid'] = event.get('spoid')
                map['bid'] = event.get('bid')
                map['updat'] = event.get('updat')
                map['free'] = event.get('free')
                map['kind'] = event.get('kind')
                map['sponam'] = event.get('sponam')
                map['planam'] = event.get('planam')
                map['addcod'] = event.get('addcod')
                map['zipadmsi'] = event.get('zipadmsi')
                map['zipadmgu'] = event.get('zipadmgu')
                map['zipcod'] = event.get('zipcod')
                map['add1'] = event.get('add1')
                map['add2'] = event.get('add2')
                map['pladesc'] = event.get('pladesc')
                map['lon'] = event.get('lon')
                map['lat'] = event.get('lat')
                map['stacolor'] = event.get('stacolor')
                map['cnt'] = event.get('cnt')
                map['ord'] = event.get('ord')
                map['id'] = event.get('id')
                map['text'] = event.get('text')
                map['spotman'] ||= []
                map['spotman'] << {'dcchacod' => event.get('dcchacod')}
                "
            timeout_tags => ["aggregate"]
            push_previous_map_as_event => true
            timeout => 3
            timeout_code => "
                elapsed_time = Time.now - event.timestamp.time
                event.tag('last') if elapsed_time >= 3
                "
        }
    }
    
    date {
        match => ["setdt","yyyyMMdd'T'HHmmss"]
        timezone => "Asia/Seoul"
        target => "setdt"
    }
    
    mutate {
        add_field => {"msg_id" => "%{bid}%{spoid}"}
    }
    



    TO-BE

    filter {
        aggregate{
            task_id => "%{spoid}_%{bid}"
            code =>"
                map['spoid'] = event.get('spoid')
                map['bid'] = event.get('bid')
                map['updat'] = event.get('updat')
                map['free'] = event.get('free')
                map['kind'] = event.get('kind')
                map['sponam'] = event.get('sponam')
                map['planam'] = event.get('planam')
                map['addcod'] = event.get('addcod')
                map['zipadmsi'] = event.get('zipadmsi')
                map['zipadmgu'] = event.get('zipadmgu')
                map['zipcod'] = event.get('zipcod')
                map['add1'] = event.get('add1')
                map['add2'] = event.get('add2')
                map['pladesc'] = event.get('pladesc')
                map['lon'] = event.get('lon')
                map['lat'] = event.get('lat')
                map['stacolor'] = event.get('stacolor')
                map['cnt'] = event.get('cnt')
                map['ord'] = event.get('ord')
                map['id'] = event.get('id')
                map['text'] = event.get('text')
                map['spotman'] ||= []
                map['spotman'] << {'dcchacod' => event.get('dcchacod')}
                "
            timeout_tags => ["aggregate"]
            push_previous_map_as_event => true
            timeout => 3
            timeout_code => "
                elapsed_time = Time.now - event.timestamp.time
                event.tag('last') if elapsed_time >= 3
                "
        }
    }
    
    filter {
        date {
            match => ["setdt","yyyyMMdd'T'HHmmss"]
            timezone => "Asia/Seoul"
            target => "setdt"
        }
    }
    
    filter {
        mutate {
            add_field => {"msg_id" => "%{bid}%{spoid}"}
        }
    }
    

    또는


    filter {
        aggregate{
            task_id => "%{spoid}_%{bid}"
            code =>"
                map['spoid'] = event.get('spoid')
                map['bid'] = event.get('bid')
                map['updat'] = event.get('updat')
                map['free'] = event.get('free')
                map['kind'] = event.get('kind')
                map['sponam'] = event.get('sponam')
                map['planam'] = event.get('planam')
                map['addcod'] = event.get('addcod')
                map['zipadmsi'] = event.get('zipadmsi')
                map['zipadmgu'] = event.get('zipadmgu')
                map['zipcod'] = event.get('zipcod')
                map['add1'] = event.get('add1')
                map['add2'] = event.get('add2')
                map['pladesc'] = event.get('pladesc')
                map['lon'] = event.get('lon')
                map['lat'] = event.get('lat')
                map['stacolor'] = event.get('stacolor')
                map['cnt'] = event.get('cnt')
                map['ord'] = event.get('ord')
                map['id'] = event.get('id')
                map['text'] = event.get('text')
                map['spotman'] ||= []
                map['spotman'] << {'dcchacod' => event.get('dcchacod')}
                "
            timeout_tags => ["aggregate"]
            push_previous_map_as_event => true
            timeout => 3
            timeout_code => "
                elapsed_time = Time.now - event.timestamp.time
                event.tag('last') if elapsed_time >= 3
                "
        }
    
        date {
            match => ["setdt","yyyyMMdd'T'HHmmss"]
            timezone => "Asia/Seoul"
            target => "setdt"
        }
    
        mutate {
            add_field => {"msg_id" => "%{bid}%{spoid}"}
        }
    }

  • 킹장
    471
    2021-01-29 15:38:17 작성 2021-01-29 15:40:09 수정됨

    filter 에 묶어주니 다른 다른 오류를 직면했습니다 ...


    [2021-01-29T06:36:25,215][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9601}
    
    /logstash-7.1.1/vendor/bundle/jruby/2.5.0/gems/rufus-scheduler-3.0.9/lib/rufus/scheduler/cronline.rb:77: warning: constant ::Fixnum is deprecated
    
    { 2012 rufus-scheduler intercepted an error:
    
      2012   job:
    
      2012     Rufus::Scheduler::CronJob "*/1 * * * * Asia/Seoul" {}
    
      2012   error:
    
      2012     2012
    
      2012     LogStash::ConfigurationError
    
      2012     Java::oracle.jdbc.driver.OracleDriver not loaded. Are you sure you've included the correct jdbc driver in :jdbc_driver_library?
    
      2012       /logstash-7.1.1/vendor/bundle/jruby/2.5.0/gems/logstash-input-jdbc-4.3.13/lib/logstash/plugin_mixins/jdbc/jdbc.rb:163:in `open_jdbc_connection'
    
      2012       /logstash-7.1.1/vendor/bundle/jruby/2.5.0/gems/logstash-input-jdbc-4.3.13/lib/logstash/plugin_mixins/jdbc/jdbc.rb:221:in `execute_statement'
    
      2012       /logstash-7.1.1/vendor/bundle/jruby/2.5.0/gems/logstash-input-jdbc-4.3.13/lib/logstash/inputs/jdbc.rb:277:in `execute_query'
    
      2012       /logstash-7.1.1/vendor/bundle/jruby/2.5.0/gems/logstash-input-jdbc-4.3.13/lib/logstash/inputs/jdbc.rb:258:in `block in run'
    
      2012       /logstash-7.1.1/vendor/bundle/jruby/2.5.0/gems/rufus-scheduler-3.0.9/lib/rufus/scheduler/jobs.rb:234:in `do_call'
    
      2012       /logstash-7.1.1/vendor/bundle/jruby/2.5.0/gems/rufus-scheduler-3.0.9/lib/rufus/scheduler/jobs.rb:258:in `do_trigger'
    
      2012       /logstash-7.1.1/vendor/bundle/jruby/2.5.0/gems/rufus-scheduler-3.0.9/lib/rufus/scheduler/jobs.rb:300:in `block in start_work_thread'
    
      2012       /logstash-7.1.1/vendor/bundle/jruby/2.5.0/gems/rufus-scheduler-3.0.9/lib/rufus/scheduler/jobs.rb:299:in `block in start_work_thread'
    
      2012       org/jruby/RubyKernel.java:1425:in `loop'
    
      2012       /logstash-7.1.1/vendor/bundle/jruby/2.5.0/gems/rufus-scheduler-3.0.9/lib/rufus/scheduler/jobs.rb:289:in `block in start_work_thread'
    
      2012   tz:
    
      2012     ENV['TZ']:
    
      2012     Time.now: 2021-01-29 06:37:01 UTC
    
      2012   scheduler:
    
      2012     object_id: 2010
    
      2012     opts:
    
      2012       {:max_work_threads=>1}
    
      2012       frequency: 0.3
    
      2012       scheduler_lock: #<Rufus::Scheduler::NullLock:0x5aa6079b>
    
      2012       trigger_lock: #<Rufus::Scheduler::NullLock:0x7a8489ca>
    
      2012     uptime: 37.303252 (37s305)
    
      2012     down?: false
    
      2012     threads: 2
    
      2012       thread: #<Thread:0x6d627411>
    
      2012       thread_key: rufus_scheduler_2010
    
      2012       work_threads: 1
    
      2012         active: 1
    
      2012         vacant: 0
    
      2012         max_work_threads: 1
    
      2012       mutexes: {}
    
      2012     jobs: 1
    
      2012       at_jobs: 0
    
      2012       in_jobs: 0
    
      2012       every_jobs: 0
    
      2012       interval_jobs: 0
    
      2012       cron_jobs: 1
    
      2012     running_jobs: 1
    
      2012     work_queue: 0
    
    } 2012 .



  • occidere
    32
    2021-01-29 15:43:44

    ojdbc 설정 오류인것 같다고 나와있네요..

    Java::oracle.jdbc.driver.OracleDriver not loaded. Are you sure you've included the correct jdbc driver in :jdbc_driver_library?


    올바른 버전인지 경로가 잘못되진 않았는지 확인해보면 될 것 같습니다

    jdbc_driver_library => "/logstash-7.1.1/lib/ojdbc8.jar"


  • 킹장
    471
    2021-01-29 16:15:14

    ㅎㅎ 해결했습니다 너무감사합니다 

  • 로그인을 하시면 답변을 등록할 수 있습니다.