Myorisma
20
2019-09-05 15:54:38 작성 2019-09-05 16:17:28 수정됨
1
381

카멜(Camel) sql 컴포넌트 질문입니다.


camel file component를 사용하여 csv파일을 json파일로 변환한 후에 camel kafka component를 사용하여 json파일을 kafka로 보내어 kafka에서 camel sql component를 사용하여 tibero DB에 insert하는 작업을 수행중입니다.


현재 kafka에서 DB로 보내는 작업만 남아있는 상태인데 insert문에서 쿼리문이 작동을 하지않아서 찾아보다가 질문드립니다.


1. kafka에서 DB 로 보내는 코드

<routes xmlns="http://camel.apache.org/schema/spring">

	<!-- kafka topic 'test-json'에서 Consuming한 데이터를 출력하고 DB에 INSERT (자동실행) -->
	<route id="from-kafka-to-DB"  autoStartup="true">
		<!-- kafka component를 이용한 kafka topic(test-json) 설정 -->
		<from uri="kafka:test-json?brokers={{kafka.brokers}}"/> 
		<log message="data received : ${body}" />
		<unmarshal>
			<json library="Jackson"/>
		</unmarshal>
		<log message="format 'Json' : ${body}"/>
		<!-- <to uri="sql:INSERT INTO gmdmf.H_SEATS(NO, SEAT_NUM, COMPANY_NO) 
			VALUES(:#${body[NO]}, :#${body[SEAT_NUM]}, :#${body[COMPANY_NO]})"/> -->
		<to uri="sql:classpath:sql/seats.sql"/>
	</route>
</routes>


2. 에러


2019-09-05 153957.355  INFO 996 --- [testcsvload] o.a.kafka.common.utils.AppInfoParser      Kafka version  2.0.1
2019-09-05 153957.355  INFO 996 --- [testcsvload] o.a.kafka.common.utils.AppInfoParser      Kafka commitId  fa14705e51bd2ce5
2019-09-05 153957.370  INFO 996 --- [ad  producer-1] org.apache.kafka.clients.Metadata         Cluster ID 5VRRrrYvStaV7Z1Bhis_9Q
2019-09-05 153957.389  INFO 996 --- [ucer[test-json]] from-json-to-kafka                        data sended  [{COMPANY_NO6,NOSEQ_H_SEATS,SEAT_NUM1},{COMPANY_NO6,NOSEQ_H_SEATS,SEAT_NUM2},{COMPANY_NO6,NOSEQ_H_SEATS,SEAT_NUM3},{COMPANY_NO6,NOSEQ_H_SEATS,SEAT_NUM4},{COMPANY_NO6,NOSEQ_H_SEATS,SEAT_NUM5}]
2019-09-05 153957.392  INFO 996 --- [umer[test-json]] from-kafka-to-DB                          data received  [{COMPANY_NO6,NOSEQ_H_SEATS,SEAT_NUM1},{COMPANY_NO6,NOSEQ_H_SEATS,SEAT_NUM2},{COMPANY_NO6,NOSEQ_H_SEATS,SEAT_NUM3},{COMPANY_NO6,NOSEQ_H_SEATS,SEAT_NUM4},{COMPANY_NO6,NOSEQ_H_SEATS,SEAT_NUM5}]
2019-09-05 153957.419  INFO 996 --- [umer[test-json]] from-kafka-to-DB                          format 'Json'  [{COMPANY_NO=6, NO=SEQ_H_SEATS, SEAT_NUM=1}, {COMPANY_NO=6, NO=SEQ_H_SEATS, SEAT_NUM=2}, {COMPANY_NO=6, NO=SEQ_H_SEATS, SEAT_NUM=3}, {COMPANY_NO=6, NO=SEQ_H_SEATS, SEAT_NUM=4}, {COMPANY_NO=6, NO=SEQ_H_SEATS, SEAT_NUM=5}]
2019-09-05 153957.423  INFO 996 --- [umer[test-json]] com.zaxxer.hikari.HikariDataSource        HikariPool-1 - Starting...
2019-09-05 153957.707  INFO 996 --- [umer[test-json]] com.zaxxer.hikari.pool.PoolBase           HikariPool-1 - Driver does not support getset network timeout for connections. (com.tmax.tibero.jdbc.driver.TbConnection.getNetworkTimeout()I)
2019-09-05 153957.720  INFO 996 --- [umer[test-json]] com.zaxxer.hikari.HikariDataSource        HikariPool-1 - Start completed.
2019-09-05 153957.777 ERROR 996 --- [umer[test-json]] o.a.camel.processor.DefaultErrorHandler   Failed delivery for (MessageId ID-DESKTOP-225HFIE-1567665594767-0-4 on ExchangeId ID-DESKTOP-225HFIE-1567665594767-0-3). Exhausted after delivery attempt 1 caught org.apache.camel.language.bean.RuntimeBeanExpressionException Failed to invoke method [NO] on java.util.ArrayList due to java.lang.IndexOutOfBoundsException Key NO not found in bean [{COMPANY_NO=6, NO=SEQ_H_SEATS, SEAT_NUM=1}, {COMPANY_NO=6, NO=SEQ_H_SEATS, SEAT_NUM=2}, {COMPANY_NO=6, NO=SEQ_H_SEATS, SEAT_NUM=3}, {COMPANY_NO=6, NO=SEQ_H_SEATS, SEAT_NUM=4}, {COMPANY_NO=6, NO=SEQ_H_SEATS, SEAT_NUM=5}] of type java.util.ArrayList using OGNL path [[NO]]

Message History
---------------------------------------------------------------------------------------------------------------------------------------
RouteId              ProcessorId          Processor                                                                        Elapsed (ms)
[from-kafka-to-DB  ] [from-kafka-to-DB  ] [kafkatest-jsonbrokers=10.47.39.124%3A7092%2C10.47.39.125%3A7092%2C10.47.39] [       381]
[from-kafka-to-DB  ] [log4              ] [log                                                                           ] [         0]
[from-kafka-to-DB  ] [unmarshal2        ] [unmarshal[org.apache.camel.model.dataformat.JsonDataFormat@289f15e9]          ] [        27]
[from-kafka-to-DB  ] [log5              ] [log                                                                           ] [         0]
[from-kafka-to-DB  ] [to4               ] [sqlclasspathsqlseats.sql                                                      ] [       353] 


3. tibero sql문

INSERT INTO gmdmf.H_SEATS(NO, SEAT_NUM, COMPANY_NO)
VALUES(:#${body[NO]}, :#${body[SEAT_NUM]}, :#${body[COMPANY_NO]})


일치하는 색이 해당 코드의 로그입니다. 

json 형태로 insert를 하려는데 단순한 json 형태를 읽어오지 못하여 NO를 찾지 못하는 것 같습니다.


- json을 어떻게 변형해야 해당 구문에 사용할 수 있을까요.

- split()을 사용하면 된다는 것 같은데 구현 방법을 이해할 수가 없습니다.


0
0
  • 답변 1

  • Myorisma
    20
    2019-09-06 10:18:14 작성 2019-09-06 10:19:33 수정됨

    해결하고 답글 남깁니다.


    에러의 이유는 리스트형의 데이터가 한번에 들어가서 sql에서 읽지 못해서 였습니다.

    그래서 리스트형 pojo를 나누어줄 필요가 있었습니다.


    가장먼저 pojo형태의 리스트형인 body를 나누어서 처리를 해주어야 합니다.그래서 split을 사용하였습니다.


    <routes xmlns="http://camel.apache.org/schema/spring">
    	<route id="KafkaToDb">
    		<!-- test-json 받음 -->
    		<from uri="kafka:test-json?brokers={{kafka.brokers}}"/>
    		<log message="received json: ${body}"/>
    		<!-- json형태의 pojo로 변환 -->
    		<unmarshal>
    			<json library="Jackson"/>
    		</unmarshal>
    		<log message="pojo: ${body}"/>
    		<!-- 리스트 형식인 pojo를 분할 함 -->
    		<split>
    			<!-- 컬렉션이나 리스트를 분할해주는 simple -->
    			<simple>${body}</simple>
    			<log message="data: ${body}"/>
    			<log message="COMPANY_NO: ${body[COMPANY_NO]}"/>
    			<log message="SEAT_NUM: ${body[SEAT_NUM]}"/>
    			<!-- DB에 적재하기 위해 body를 HashMap형태로 변환 -->
    			<convertBodyTo type="java.util.HashMap"/>
    			<!-- sql파일을 읽어와 해당 sql 구문 실행 -->
    			<to uri="sql:classpath:sql/seats.sql"/>
    		</split>
    	</route>
    </routes>


    컬렉션이나 리스트를 나누어주는 simple을 사용해서 자를때마다 DB에 적재했습니다.


    0
  • 로그인을 하시면 답변을 등록할 수 있습니다.