Node-RED フロー: SQLite の新規アカウントを Snowflake データウェアハウスに追加・更新する



Node-RED は、ハードウェアデバイス、API、オンラインサービスを連携したデータフローを作成できるツールです。API Server は、オンプレミスのデータベースからクラウドベースのサービス、フラットファイルまで、250以上のデータソースに対して REST API を作成します。Node-RED から API Server に接続することで、ビジネスに役立つデータフローを作成できます。この記事では、業務システム(SQLite)から新規アカウントをクエリし、同じデータを使用して Snowflake に新規アカウントを作成(または既存のアカウントを更新)するフローの作成方法を説明します。

Node-RED のインストール



Node-RED を実行するには、Node.js をインストールする必要があります。インストール方法の詳細は、Node.js の Web サイトを参照してください。Node.js をインストールしたら、npm を使用して Node-RED をインストールするのが最も簡単な方法です(以下参照)。その他のインストール方法については、Node-RED のインストールページを参照してください。

sudo npm install -g --unsafe-perm node-red

API Server のセットアップ



250以上のサポートされるデータソースのいずれかを使用してフローを構築するには、API Server をインストールして実行します。次に、データソースへの接続を設定します。この記事では、SQLite データベースと Snowflake データウェアハウスへの接続を設定します。以下の手順に従って、SQLite と Snowflake 上にセキュアな Web サービスを作成していきましょう。

デプロイ

API Server は自社サーバーで実行します。Windows では、スタンドアロンサーバーまたは IIS を使用してデプロイできます。Java サーブレットコンテナでは、API Server の WAR ファイルをドロップするだけです。詳細および手順については、ヘルプドキュメントを参照してください。

API Server は、Microsoft AzureAmazon EC2Heroku にも簡単にデプロイできます。

SQLite への接続

Connections をクリックして新しい接続を追加し、SQLite に接続するために必要な接続プロパティを入力します。

SQLite データベースに接続するには、データベースファイルへの Path を指定するだけです。

Snowflake への接続

Connections をクリックして新しい接続を追加し、Snowflake データウェアハウスに接続するために必要な認証値およびその他の接続プロパティを入力します。

Snowflake は OAuth 認証標準を使用します。OAuth では、認証するユーザーがブラウザからログインする必要があります。OAuth を使用して認証するには、Snowflake セキュリティ統合に基づいて OAuthClientIdOAuthClientSecretCallbackURL プロパティを設定します。また、Snowflake アカウントの URLUserWarehouseDatabase、および(オプションで)Schema を指定する必要があります。

OAuth の使用方法については、ヘルプドキュメントの「はじめに」の章を参照してください。

エンドポイントの作成

接続を設定したら、API -> Add Table をクリックして、API Server がアクセスできる SQLite および Snowflake のテーブルを選択します。この記事では、SQLite の Account テーブルと Snowflake の Online Account テーブルのエンドポイントを作成します。

Node-RED フローの構築



API Server からのデータに接続して使用する Node-RED フローの構築は、Node-RED インターフェースでコンポーネントをドラッグ&ドロップするだけの簡単な作業です。Node-RED を起動するには、コマンドラインから node-red を呼び出すだけです。初めてフローを作成する場合は、空のワークスペースが表示されます。

以下の手順に従ってフローを作成するか、完成したフローをダウンロードしてウォークスルーを参照してください。

SQLite のアカウントデータを取得

inject ノードをワークスペースにドラッグします。これがフローを開始するためのノードです。ノードをダブルクリックして名前を付けます。次に、http request ノードをワークスペースにドラッグします。このノードは、API Server にリクエストを送信して、新しい SQLite アカウントがあるかどうかを確認します。ノードをダブルクリックして、ノードのプロパティを設定します。この場合、ノードは SQLite Account テーブルのエンドポイントに HTTP GET リクエストを送信します(増分検索の場合は、最後にアカウントを検索した時刻に基づいて $filter パラメータを URL に追加します)。セキュリティのため、API Server 内で作成したユーザー(Users)に基づいて基本認証を使用しています。2つのノードをワイヤで接続します。

SQLite アカウントの処理

次に、json ノードと2つの function ノードをダッシュボードに配置します。json ノードは、API Server からのレスポンスを JSON オブジェクトに解析するために使用します。2つの function ノードは、HTTP リクエストの結果をフローの残りの部分に1つずつ渡していきます。

最初の function ノードには2つの出力があります:msg.payload の値の配列から最初の結果と、残りの値を含む配列です。ノードをダブルクリックして、以下の JavaScript を function にコピーします:
var arrLen = msg.payload.value.length;
if(arrLen > 1) {
    var msg2 = {};
    msg2.payload = msg.payload.value.slice(1,arrLen);
    msg.payload = msg.payload.value[0];
    return [msg,msg2];
} else if (arrLen == 1) {
    msg.payload = msg.payload.value[0];
    // API Server のレスポンスヘッダーを削除
    msg.headers = {};
    return [msg, null];
}
return [null, null];

2番目の function は、残りの値の配列を前の function の入力に一致するメッセージに再構成します。これにより、SQLite アカウントの HTTP リクエストの各結果を1つずつ処理できます。ノードをダブルクリックして、以下の JavaScript を function にコピーします:

var payload = msg.payload;
msg.payload = {}
msg.payload.value = payload;
return msg;

json ノードの出力から最初の新しい function ノード(SplitFirstResult)の入力へ、SplitFirstResult の2番目の出力から2番目の新しい function ノード(SendOtherResultsBackThrough)の入力へ、SendOtherResultsBackThrough の出力から SplitFirstResult の入力へ、それぞれワイヤを作成します。

Snowflake の既存アカウントを検索

別の function ノードと別の http request ノードをワークスペースにドラッグします。function ノードは、SQLite Account の情報を msg.payload から msg.sqliteResult に移動します。以下の JavaScript を function にコピーします:

msg.headers = {};
msg.sqliteResult=msg.payload;
return msg;

http request ノードは、SQLite アカウントと同じ名前の既存アカウントを Snowflake で検索します。ノードの URL を Snowflake Account エンドポイントを指すように設定し、OData の $filter パラメータを使用して既存のアカウントを検索します(フィルター内の Name の周りに3重括弧があることに注意)。また、前の http request ノードと同様にユーザー名とパスワードを設定します。

フィルター付き Snowflake Accounts エンドポイント

http://localhost:8153/api.rsc/SNOW_CRM_Account/?$filter=(Id eq '{{{Id}}}')

SplitFirstResult function の最初の出力から新しい function(StoreSnowResult)の入力へ、StoreSnowResult の出力から新しい http request ノードへ、それぞれワイヤを作成します。

追加または更新の判断

次に、json ノード、switch ノード、2つの新しい function ノードをダッシュボードにドラッグします。json ノードは API Server からのレスポンスを解析するためにあり、switch は Snowflake で既存のアカウントが見つかったかどうかに基づいてフローを分岐させます:

最初の function ノード(AddSetup)は、次の HTTP リクエストに備えて既存のヘッダーをクリアし、追加フラグを true に設定するだけです。以下の JavaScript を function にコピーします:

if (msg.sqliteResult) {
  msg.headers = {};
  msg.add = true;
  return msg;
}
return null;

2番目の function ノード(UpdateSetup)は、既存のヘッダーをクリアし、既存のアカウントの ID を保存し、追加フラグを false に設定します。以下の JavaScript を function にコピーします:

if (msg.sqliteResult &&
    (typeof msg.payload.value[0].Id == 'string')) {
  msg.headers = {};
  msg.Id = msg.payload.value[0].Id ;
  msg.add = false;
  return msg;
}
return null;

http request ノードの出力から json ノードの入力へ、json ノードの出力から AddAccount switch ノードの入力へ、AddAccount の出力 1 と 2 からそれぞれ AddSetup と UpdateSetup の入力へ、ワイヤを作成します。

SQLite アカウントを Snowflake アカウントにマッピング

新しい function ノードをワークスペースにドラッグします。この function は、SQLite Account の関連フィールドを Snowflake Account にマッピングします。アカウントが新規の場合は Name が含まれ、それ以外の場合は無視されます。以下の JavaScript を新しい function ノード(AccountUpdate)にコピーします:

var up_account = {};
var account = msg.SQLiteResult;

if (account.Id && (msg.add))
  up_account.Id = account.Id;

if (account.Active__c)
  up_account.Active__c = account.Active__c;
if (account.BillingCity)
  up_account.BillingCity = account.BillingCity;
if (account.BillingCountry)
  up_account.BillingCountry = account.BillingCountry;
if (account.BillingState)
  up_account.BillingState = account.BillingState;
if (account.BillingLatitude)
  up_account.BillingLatitude = account.BillingLatitude;
if (account.BillingStreet)
  up_account.BillingStreet = account.BillingStreet;
if (account.BillingLongitude)
  up_account.BillingLongitude = account.BillingLongitude;
if (account.BillingPostalCode)
  up_account.BillingPostalCode = account.BillingPostalCode;
if (account.CurrencyIsoCode)
  up_account.CurrencyIsoCode = account.CurrencyIsoCode;
if (account.fax)
  up_account.fax = account.fax;
if (account.Phone)
  up_account.Phone = account.Phone;
if (account.ShippingCity)
  up_account.ShippingCity = account.ShippingCity;
if (account.ShippingCountry)
  up_account.ShippingCountry = account.ShippingCountry;
if (account.ShippingState)
  up_account.ShippingState = account.ShippingState;
if (account.ShippingLatitude)
  up_account.ShippingLatitude = account.ShippingLatitude;
if (account.ShippingStreet)
  up_account.ShippingStreet = account.ShippingStreet;
if (account.ShippingLongitude)
  up_account.ShippingLongitude = account.ShippingLongitude;
if (account.ShippingPostalCode)
  up_account.ShippingPostalCode = account.ShippingPostalCode;
if (account.Website)
  up_account.Website = account.Website;

msg.payload = up_account;
return msg;

AddSetup と UpdateSetup の出力から SQLiteToSnowflake の入力へワイヤを作成します:

新規アカウントの作成または既存アカウントの更新

新しい switch ノードと2つの新しい http request ノードをダッシュボードにドラッグします。switch ノードは、フローが新しい Snowflake アカウントを追加する HTTP リクエストを送信するか、既存のアカウントを更新する HTTP リクエストを送信するかを決定します。

追加または更新を決定するには、msg.add に基づいて switch ノードを設定します:

最初の http request ノードは、Snowflake に新しいアカウントを追加します。これを行うには、API Server の Snowflake Account エンドポイントに HTTP POST リクエストを送信し、msg.payload をリクエストボディとして使用します。

Snowflake アカウントを更新するには、API Server の Snowflake Account エンドポイントに HTTP PUT リクエストを送信し、msg.Id で更新するアカウントを指定します。同様に、msg.payload をリクエストボディとして使用します。

AccountMapping の出力から AddOrUpdate の入力へ、AddOrUpdate の最初の出力から追加に使用する http request ノードの入力へ、AddOrUpdate の2番目の出力から更新に使用する http request ノードの入力へ、それぞれワイヤを作成してフローを完成させます。

このフローは、API Server でサポートされている他のデータソースで動作するように簡単に変更できます。

詳細情報と無料トライアル



すべてのノードとワイヤを設定したら、フローをデプロイし、inject ノードをクリックしてフローを開始します(またはスケジュールを設定してフローを繰り返しトリガーします)。API Server の詳細については、ナレッジベースの記事を参照してください。30日間の無料トライアルをダウンロードするには、API Server の製品ページにアクセスしてください。ご質問がある場合は、サポートチームがお手伝いいたします。

始める準備はできましたか?

CData API Server の無料トライアルをダウンロード:

今すぐダウンロード