メインコンテンツへスキップ

はじめに

Amazon Lambda関数とDomo JSON Webhook DataSetを使用して、Amazon Kinesis StreamにプッシュされているデータをDomo DataSetに書き込むことができます。
kinesis_1.png

必要条件

Kinesis StreamからDomoにデータを書き込むには、以下が必要です。
  • Kinesisレコード1つにつき1つのJSONオブジェクトを持つ、JSONフォーマットのデータを受信するKinesis Stream
  • アクティブなDomoインスタンス

ステップ1 – JSON Webhook DataSetを作成する

KinesisにプッシュされたデータがJSON Webhook DataSetの中にコピーされます。以下の指示に従ってDataSetを作成します。
  1. Domo Appstoreで、JSON Webhookコネクターを探してクリックします。
  2. [Get the Data] をクリックします。
  3. [Webhook URL] フィールドの中のリンクをコピーし、保存します(このフィールドは変更できません)。
    Webhook URL_[Details]セクション
  4. (オプション)データがWebhookに送信されるときにWebhookに対するシークレットの認証情報が要求されるようにしたい場合は、[Webhook Secret] フィールドに入力してください(以降のステップに対しては、Webhookシークレットが使用されていないと仮定します)。
  5. DataSetを追加にするか、置き換えにするかを選択します。デフォルトでは追加に設定されており、これは一般にKinesisユーザーが必要とするものです。
  6. [Next] をクリックします。
  7. DataSetに名前を付け、[Save] をクリックします。
DataSetが実行されるように見えますが、データは登録されません。JSONデータがWebhook URLに送信された後のみ、データが含まれます。システムは自動的に、15分ごとに新しいデータがあるかどうかチェックします。 [Run DataSet] を選択しても、プロセスは速くなりません。

ステップ2 – Amazon Lambda関数を作成する

Kinesis StreamからWebhookの中にデータを送信するには、Amazon Lambda関数を使用します。これは、新しいデータがKinesis Streamの中にプッシュされるときは常に呼び出される小さなJavaScript関数です。この関数はすべての新しいKinesisレコードを1つのJSON配列の中に統合し、そのデータをWebhookに送信します。各レコードがDomo DataSetの中の単独の行になります 以下の指示に従ってLambda関数を作成します。
  1. AWSマネジメントコンソールでLambdaサービスを選択します(実際のAWSのUIとは少し異なる場合があります)。
    Lambda
  2. 左側のメニューで、[Functions] を選択します。
    kinesis_4.png
  3. [Create function] をクリックします。
    kinesis_5.png
  4. フィールドに記入し、最初から関数を作成します。
    • [Function name] – 関数に説明的な、分かりやすい名前を付けます。
    • [Runtime] – 関数が使用する言語を選択します。このチュートリアルではNode.js 20.xを使用しています。Java、C#、Go、Python、またはRubyを使用して関数を作成することも可能です。
    • [Role] – [Choose an existing role] を選択します。AWS管理者と協力し、この関数に適した権限が存在するようにしてください。権限はKinesisを読み取り、(ロギングのために)CloudWatchに書き込み、HTTPSコールを行うことができなければなりません。
      関数を作成する_最初から作成する
  5. [Create function] をクリックします。
  6. Lambda関数が作成されたら、画面の左側にあるメニューから [Kinesis] を選択し、Kinesisをトリガーとして関数に追加します。
    トリガーを追加する_Kinesis2Webhook
  7. 必要なフィールドに入力して、トリガーを設定します。一般的に、デフォルト値が許容されます。
    • [Kinesis stream] – この関数をトリガーするために使用するStreamを選択します。これは、Domoにコピーするデータを取得するStreamです。
    • [Consumer] – これは、[No consumer] のままにしておきます。
    • [Batch Size] – Kinesis Streamが扱う必要のあるデータ量にもとづいて調整することができます。最初は100にしておきます。
    • [Starting Position] – [Latest] に設定します。これにより、関数にはKinesis Streamの直近のデータが与えられます。
    • [Activate Trigger] – 必ずこのボックスにチェックマークを入れてください。
      トリガー設定
  8. [Add] をクリックします。
  9. 関数の名前をクリックして関数コードパネルを表示します。
    kinesis_9.png
  10. コードフィールドを次のように設定します。
    • [Code entry type] – [Edit code inline]
    • [Runtime] – [Node.js 20.x]
    • [Handler] – [index.handler]
  11. コードウィンドウはindex.js という名のファイルを表示するはずです。そのファイルの中のすべてのコードを次のように置換します。 /**
    * Stream data from AWS Kinesis to Domo via Webhook
    */
    'use strict';
    const https = require('https');
    // This is called when the Lambda function is triggered.
    // It should be configured to be triggered by data in a Kinesis Stream
    exports.handler = (event, context, callback) => {
    let count = 0;
    // Build an array of all of the JSON objects that came in from Kinesis
    let collectedData = [];
    event.Records.forEach((record) => {
    // Kinesis data is base64 encoded so decode here
    const data = new Buffer(record.kinesis.data, 'base64').toString('ascii');
    // console.log('Decoded payload:', JSON.stringify(data, null, 2));
    console.log('Decoded payload:', data);
    // Assumes that each Kinesis record contains only 1 JSON object
    let item = JSON.parse(data);
    collectedData[count] = item;
    count += 1;
    });
    // The array of objects will be passed to the webhook in a single POST call
    let postData = JSON.stringify(collectedData);
    let options = {
    host: process.env.WEBHOOK_HOST, // Should generally be " webhooks.domo.com "
    path: process.env.WEBHOOK_ENDPOINT, // Should be the rest of the webhook URL given by Domo
    method: 'POST',
    headers: {
    'Content-Type': 'application/json',
    'Content-Length': postData.length
    }
    };
    const req = https.request(options, (res) => {
    console.log('statusCode: ' + res.statusCode);
    res.on('data', (d) => \{
    console.log('data: ' + d);
    });
    });
    req.on('error', (e) => {
    console.error('*** ERROR ***');
    console.error(e);
    });
    req.write(postData);
    req.end();
    console.log('Pushed ' + count + ' record(s) to the webhook');
    };
  12. コードウィンドウの下に、2つの環境変数を追加します。
    • [WEBHOOK_HOST]– これは、この手順のステップ1で作成したDataSet内のWebhook URLのドメインの部分です。これは通常 webhooks.domo.com です。
    • [WEBHOOK_ENDPOINT]– これは、Webhook URLの「/api」以降のすべてです。これは非常に長い英数字の文字列になります。
      環境変数
  13. エディターの中のメニューから、[File]>[Save] を選択します。
  14. 画面の一番上で、[Save] をクリックし、Lambda関数に対して行ったすべての変更を保存します。
これで、データがKinesis Streamの中にプッシュされるときは、常にこのLambda関数が実行されます。関数は個々のKinesisレコードを取り、それらを1つのJSON配列に結合し、そのJSON配列をHTTPS POSTコールを介してWebhook DataSetに移します。これがデータをDomoにコピーします。