NetSuite のデータをApache Kafka トピックにストリーミング
Apache Kafka は、主にリアルタイムデータパイプラインやイベント駆動型アプリケーションの構築に使用されるオープンソースのストリーム処理プラットフォームです。CData JDBC Driver for NetSuiteと組み合わせることで、Kafka はライブのNetSuite のデータを扱うことができます。この記事では、NetSuite データをApache Kafka トピックに接続、アクセス、ストリーミングする方法と、Confluent Control Center を起動してConfluent プラットフォームのKafka インフラストラクチャを使用して受信したNetSuite のデータをユーザーが安全に管理および監視できるようにする方法について説明します。
CData JDBC Driver は最適化されたデータ処理機能を内蔵しており、ライブのNetSuite のデータとのやり取りにおいて比類のないパフォーマンスを提供します。NetSuite に複雑なSQL クエリを発行すると、ドライバーはフィルタや集計などのサポートされているSQL 操作を直接NetSuite にプッシュし、サポートされていない操作(多くの場合SQL 関数やJOIN 操作)については組み込みのSQL エンジンを使用してクライアント側で処理します。組み込みの動的メタデータクエリにより、ネイティブのデータ型を使用してNetSuite のデータを操作および分析できます。
NetSuite データ連携について
CData は、Oracle NetSuite のライブデータにアクセスし、統合するための最も簡単な方法を提供します。お客様は CData の接続機能を以下の目的で使用しています:
- Standard、CRM、OneWorld を含む、すべてのエディションの NetSuite にアクセスできます。
- SuiteTalk API(SOAP ベース)のすべてのバージョンと、SQL のように機能し、より簡単なデータクエリと操作を可能にする SuiteQL に接続できます。
- Saved Searches のサポートにより、事前定義されたレポートとカスタムレポートにアクセスできます。
- トークンベースおよび OAuth 2.0 で安全に認証でき、あらゆるユースケースで互換性とセキュリティを確保します。
- SQL ストアドプロシージャを使用して、ファイルのアップロード・ダウンロード、レコードや関連付けのアタッチ・デタッチ、ロールの取得、追加のテーブルやカラム情報の取得、ジョブ結果の取得などの機能的なアクションを実行できます。
お客様は、Power BI や Excel などのお気に入りの分析ツールからライブ NetSuite データにアクセスするために CData ソリューションを使用しています。また、CData Sync を直接使用するか、Azure Data Factory などの他のアプリケーションとの CData の互換性を活用して、NetSuite データを包括的なデータベースやデータウェアハウスに統合しています。CData は、Oracle NetSuite のお客様が NetSuite からデータを取得し、NetSuite にデータをプッシュするアプリを簡単に作成できるよう支援し、他のソースからのデータを NetSuite と統合することを可能にしています。
当社の Oracle NetSuite ソリューションの詳細については、ブログをご覧ください:Drivers in Focus Part 2: Replicating and Consolidating ... NetSuite Accounting Data
はじめに
前提条件
Apache Kafka トピックでNetSuite のデータをストリーミングするためにCData JDBC Driver を接続する前に、クライアントのLinux ベースのシステムに以下をインストールおよび設定してください。
- Confluent Platform for Apache Kafka
- Confluent Hub CLI のインストール
- Confluent Platform 用のSelf-Managed Kafka JDBC Source Connector
NetSuite のデータへの新しいJDBC 接続を定義
- Linux ベースのシステムにCData JDBC Driver for NetSuiteをダウンロードします。
- 以下の手順に従って新しいディレクトリを作成し、すべてのドライバーの内容を展開します:
- NetSuite という名前の新しいディレクトリを作成します。
mkdir NetSuite
- ダウンロードしたドライバーファイル(.zip)をこの新しいディレクトリに移動します。
mv NetSuiteJDBCDriver.zip NetSuite/
- CData NetSuiteJDBCDriver の内容をこの新しいディレクトリに解凍します。
unzip NetSuiteJDBCDriver.zip
- NetSuite という名前の新しいディレクトリを作成します。
- NetSuite ディレクトリを開き、lib フォルダに移動します。
ls cd lib/
- CData JDBC Driver for NetSuite のlib フォルダの内容をKafka Connect JDBC のlib フォルダにコピーします。Kafka Connect JDBC フォルダの内容を確認し、cdata.jdbc.netsuite.jar ファイルがlib フォルダに正常にコピーされたことを確認します。
cp -r /path/to/CData JDBC Driver for NetSuite/lib/* /usr/share/confluent-hub-components/confluentinc-kafka-connect-jdbc/lib/ cd /usr/share/confluent-hub-components/confluentinc-kafka-connect-jdbc/lib/
- 以下のコマンドを使用して、CData NetSuite JDBC ドライバーのライセンスをインストールします。名前とメールアドレスを入力してください。
java -jar cdata.jdbc.netsuite.jar -l
- プロダクトキーまたは"TRIAL" を入力します(ライセンスの有効期限が切れた場合は、CData サポートチームまでお問い合わせください)。
- 以下のコマンドを使用してConfluent ローカルサービスを起動します:
confluent local services start
これにより、Zookeeper、Kafka、Schema Registry、Kafka REST、Kafka CONNECT、ksqlDB、Control Center などのすべてのConfluent サービスが起動します。これで、CData JDBC Driver for NetSuite を使用してKafka Connect Driver 経由でksqlDB のKafka トピックにメッセージをストリーミングする準備が整いました。
- POST HTTP API リクエストを使用してKafka トピックを手動で作成します:
curl --location 'server_address:8083/connectors' --header 'Content-Type: application/json' --data '{ "name": "jdbc_source_cdata_netsuite_01", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", "connection.url": "jdbc:netsuite:AccountId=XABC123456;Schema=SuiteTalk;AuthScheme=Token;OAuthClientId=MyOAuthClientId;OAuthClientSecret=MyOAuthClientSecret;OAuthAccessToken=MyOAuthAccessToken;OAuthAccessTokenSecret=MyOAuthAccessTokenSecret;", "topic.prefix": "netsuite-01-", "mode": "bulk" } }'HTTP POST 本文(上記)で使用されるフィールドについて説明します:
- connector.class: 使用するKafka Connect コネクタのJava クラスを指定します。
- connection.url: NetSuite データに接続するためのJDBC 接続URL です。
組み込みの接続文字列デザイナー
JDBC URL の作成については、CData JDBC Driver for NetSuiteに組み込まれた接続文字列デザイナーを使用してください。JAR ファイルをダブルクリックするか、コマンドラインからjar ファイルを実行します。
java -jar cdata.jdbc.netsuite.jar
接続プロパティを入力し、接続文字列をクリップボードにコピーします。
NetSuiteへの接続
NetSuite では、2種類のAPI でデータにアクセスできます。どちらのAPI を使用するかは、Schema 接続プロパティで以下のいずれかを選択して指定してください。
- SuiteTalk は、NetSuite との通信に使用されるSOAP ベースの従来から提供されているサービスです。幅広いエンティティをサポートし、INSERT / UPDATE / DELETE の操作も対応しています。ただし、SuiteQL API と比べるとデータの取得速度が劣ります。また、サーバーサイドでのJOIN に対応していないため、これらの処理はCData 製品がクライアントサイドで実行します。
- SuiteQL は、より新しいAPI です。JOIN、GROUP BY、集計、カラムフィルタリングをサーバーサイドで処理できるため、SuiteTalk よりもはるかに高速にデータを取得できます。ただし、NetSuite データへのアクセスは読み取り専用となります。
データの取得のみが目的でしたらSuiteQL をお勧めします。データの取得と変更の両方が必要な場合は、SuiteTalk をお選びください。
NetSuite への認証
CData 製品では、以下の認証方式がご利用いただけます。
- トークンベース認証(TBA)はOAuth1.0に似た仕組みです。2020.2以降のSuiteTalk とSuiteQL の両方で利用できます。
- OAuth 2.0 認証(OAuth 2.0 認可コードグラントフロー)は、SuiteQL でのみご利用いただけます。
- OAuth JWT 認証は、OAuth2.0 クライアント認証フローの一つで、クライアント認証情報を含むJWT を使用してNetSuite データへのアクセスを要求します。
トークンベース認証(OAuth1.0)
トークンベース認証(TBA)は、基本的にOAuth 1.0 の仕組みです。この認証方式はSuiteTalk とSuiteQL の両方でサポートされています。管理者権限をお持ちの方がNetSuite UI 内でOAuthClientId、OAuthClientSecret、OAuthAccessToken、OAuthAccessTokenSecret を直接作成することで設定できます。 NetSuite UI でのトークン作成手順については、ヘルプドキュメントの「はじめに」セクションをご参照ください。
アクセストークンを作成したら、以下の接続プロパティを設定して接続してみましょう。
- AuthScheme = Token
- AccountId = 接続先のアカウント
- OAuthClientId = アプリケーション作成時に表示されるコンシューマーキー
- OAuthClientSecret = アプリケーション作成時に表示されるコンシューマーシークレット
- OAuthAccessToken = アクセストークン作成時のトークンID
- OAuthAccessTokenSecret = アクセストークン作成時のトークンシークレット
その他の認証方法については、ヘルプドキュメントの「はじめに」をご確認ください。
- topic.prefix: コネクタによって作成されるKafka トピックに追加されるプレフィックスです。「netsuite-01-」に設定されています。
- mode: コネクタの動作モードを指定します。ここでは「bulk」に設定されており、コネクタがバルクデータ転送を実行するように設定されていることを示しています。
このリクエストにより、NetSuite のすべてのテーブル/コンテンツがKafka トピックとして追加されます。
注意: リクエストをPOST するIP アドレス(サーバー)は、Linux ネットワークのIP アドレスです。
- ksqlDB を実行し、トピックを一覧表示します。以下のコマンドを使用します:
ksql list topics;
- トピック内のデータを表示するには、以下のSQL ステートメントを入力します:
PRINT topic FROM BEGINNING;
Confluent Control Center への接続
Confluent Control Center のユーザーインターフェースにアクセスするには、上記のセクションで説明した"confluent local services" を実行し、ローカルブラウザでhttp://<server address>:9021/clusters/ と入力してください。
おわりに
CData JDBC Driver for NetSuiteの30日間無償トライアルをダウンロードして、NetSuite データをApache Kafka にストリーミングしましょう。ご不明な点があれば、サポートチームまでお問い合わせください。