読者です 読者をやめる 読者になる 読者になる

/var/log/laughingman7743.log

I thought what I'd do was, I'd pretend I was one of those deaf-mutes or should I?

AmazonECSデプロイスクリプトをPythonで再実装してみた

AWS ECS Python

シェルスクリプト*1つらいってことでPythonで再実装してみました。基本的な使い方は変わらないです。(古いタスクが終了するまで待つとかなり時間がかかる場合があるので、forceフラグで新しいタスクのlastStatusがRUNNINGになったら古いタスクを強制的に止めるような処理もできるようにしてみました。この辺はサービススケジューラにまかせてしまったほうが無難な気が。。。)

click*2を使うとPythonCLIツールを作るのが捗りますね。
作者のmitsuhiko先生がyoutubeに上げている動画*3は必見です。

Enjoy!

PythonでRequestsを使ったSlackのAttachments通知でハマった

Python Slack

以下のようにattachmentsの中身をjosn.dumpsしてやればOK。

POST(application/x-www-form-urlencoded)で送っているので、ネストしたオブジェクトはJSON形式の文字列にする必要があります。 POSTでもContentTypeをapplication/jsonで、BodyをJSON形式で送るような形だと楽ですけどね。

Enjoy!

Kinesis + Lambdaの組み合わせが便利

AWS Kinesis Lambda

fluentd*1で出力先をいろいろと追加するよりは、一旦Kinesisに突っ込んでLambdaで処理した方がいろいろと捗る感じがします。
特に部署やチーム毎に欲しいログの形式、バケット、パスの切り方(時間単位 or 日単位 or サーバ単位 etc...)、出力先(S3、Kibana etc..)等が違ったりして、 もろもろ要件を満たすより、Kinesisのストリームを用意しておくからあとは各々で好きに処理してみたいな運用ができそうです。

以下はKinesisのストリームからS3に出力するようなLambdaの実装例です。
Lambdaのローカル環境での実行はpython-lambda-local*2、アップロードはlambda-uploader*3を使っています。

Kinesisは最大7日保持できるので安心感ありますね。Lambdaを使えばサーバレスにできるのも魅力です。 スケーラビリティもAWSにおんぶにだっこで、運用はかなり楽になるかと。
どちらにしてもKinesisに突っ込むのはfluendからだったりするんですけどね。。。

Enjoy!

Redshift便利スクリプト

AWS Redshift Python

AWSLabsのGithubリポジトリにいろいろと便利なスクリプトがあります。

日次でVacuum処理やAnalyze処理を夜間に走らせたいということで、 Analyze & Vacuum Schema Utility*1をAirflow*2で使いたいなと思いましたが、 クラス化されていなかったり、ログがファイルに出力されたりとイマイチ使い勝手が悪いのでオレオレリファクタリングしてみました。
クラス化して、PostgreSQLのライブラリをPyGreSQL*3からPsycopg2*4とSQLAlchemy*5の組み合わせに変更、 ログ出力は全てloggingに変更、エラーが発生した場合はExceptionを投げるような形に変更しています。

細かなパラメータが設定できて便利です。
その他、AdminViews*6等便利なスクリプトを公開してくれているので活用すると良いですね。

Enjoy!

Boto3でEMR

AWS EMR Python

最近HiveでETL処理とかするためにEMRをよくさわっています。Boto3からクラスタ起動したりステップ追加したりしています。 Boto2だとクラスタ作成時にセキュリティグループの設定等をapi_paramsに追加する形になり非常にわかりにくいです。 Boto3だとJSONライクな指定ができていろいろと捗ります。

日次でクラスタを起動してETL処理するようなスクリプトのサンプルです。AirflowでDAGを作って実行すると良いですね。

https://gist.github.com/laughingman7743/5c675c9b1d9ed02539e6

クラスタの起動

クラスタ起動時にステップも指定できますが、どうも追加される順序がバラバラだったりするので、起動してから順に追加するような形が良さそうです。

Hiveメタデータストアの指定

ConfigurationsにRDSの接続先を指定します。

HadoopJarステップ

以下はS3DistCpの処理をステップに追加するような例です。 Hiveは小さなファイルが大量にあると非常に処理に時間がかかったりするので、S3DistCpを使ってマージすると良いです。

HiveScriptステップ

Boto2のソース*1を参考にHadoopJarステップを組み立ててやるだけです。

クラスタのポーリング

全ステップが正常に完了した後に別の処理をしたい(例えばETL処理したデータをRedshiftに取り込む等)といった場合は、 クラスタ起動前の時刻を取得しておき、それ以降に起動したクラスタ一覧を取得し、ステータスを調べるようなことをすれば良さそうです。

自前でHadoopクラスタを運用するのはかなり大変なのでEMRを活用すると良いですね。 ETL処理時のみクラスタを起動し、処理が終われば落としておくような運用であればコストもかなり抑えられます。

Enjoy!

Elasticsearchに空間データを突っ込む

Elasticsearch Python GIS

例のごとく国土交通省のデータをダウンロードしてきてElasticsearchに突っ込んで検索してみます。

国土数値情報ダウンロードサービス

GDALのドキュメント*1を参照するとogr2ogrコマンドでシェープファイルを直接Elasticsearchに投入できそうですが、 マッピング定義をあれこれ変更したりしていろいろやってみましたがGeoShape形式での投入はできず。。。
C++で実装されているのでとりあえず深掘りするのはやめて、GeoJSON形式に変換してPythonスクリプトから投入することにします。

以下の国土交通省のページから日本全国の行政区域データをサウンロードしてきます。

国土数値情報 行政区域データの詳細

シェープファイルファイルのGeoJSON変換は以下のコマンドでできます。

変換するとFeatureCollectionという一塊のJSONオブジェクトとして出力されます。 Features配列内のFeatureオブジェクトを1レコードとして登録すると良さそうなのですが、一塊のJSONオブジェクトかつかなりのファイルサイズで、 そのままパースするとメモリがいくらあっても足りない感じです。JavaStAX的なストリーミング処理でJSONをパース出来ると良さそうですね。
PythonでそのようなJSONパーサーを探してみると、ijson*2というライブラリが見つかります。 これを使ってStAX的なJSONパース処理を書いてみました。メモリ使用量も抑えられて素敵ですね。 Elasticsearchへの投入処理はBulkAPI*3を利用します。

curlコマンドで以下のマッピング定義を登録、Pythonスクリプトを実行してデータを投入します。 投入先のインデックスはjp_city、typeはFeatureCollectionとしています。 マッピング定義は以下です。

投入が終わったら適当に日本のへそでも検索してみましょう。

問題なく検索できているようです。

最近ではKibanaがWMSに対応*4したりと、ElasticsearchでGIS的なことがやりやすくなっています。 クラスタが組みやすいので、PostgreSQL(PostGIS)よりも検索性能よかったりするかもしれません。 ElasticsearchをバックエンドにしたモダンなGISなんて素敵ですね。

Enjoy!

Playframework2.4 + Slick3.0で複数のデータベースに接続する

Playframework Slick Scala

@NamedDatabaseアノテーションをつければ対応するDB名のDatabaseConfigProviderをDIしてくれます*1が、 なんとなくDatabaseConfigProviderをDIしたくないなってことで、別の方法で実装してみた。
どちらにしてもPlay.currentに依存してるのであまり意味ないなと。。。

リポジトリは以下のようにDB名をDIする形。

DB名はモジュールを作ってDIする。

テスト時はテスト用のモジュールに入れ替えて、適当にインメモリのDBで済ませる。

テスト用のDB設定は設定ファイルでなく、Slick用のインメモリDB設定を作るヘルパーメソッドを用意しておくとテストケースごとにランダムなDBを利用できて良さそう。

build.sbtのjavaOptionsで、テスト時にテスト用の設定ファイルを読み込む設定を忘れずに。

設定ファイルは他の設定ファイルをincludeできるので、DB設定やモジュール設定等で分割して管理したほうがいろいろと捗る感じがします。

Enjoy!