Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 62 additions & 5 deletions aggregation/candlesticks/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,31 +2,66 @@

import flask
import pymongo
from pymongo import ASCENDING
import sys
import datetime

from flask import request

app = flask.Flask(__name__)

db = pymongo.Connection().demo
db.money.ensure_index("ts")
dbname = "BIGDATASpain"
db = pymongo.Connection()[dbname]
db["fiveMinBars"].ensure_index("ts")
db["minbars"].ensure_index("ts")


@app.route("/")
@app.route("/old")
def index():
return flask.render_template("index.html")

@app.route("/")
@app.route("/min")
def one():
return flask.render_template("fs.html", title="One Minute Chart")

@app.route("/sparked")
@app.route("/five")
def five():
return flask.render_template("fs.html", title="Five Minute Chart")

@app.route("/fiveticks.json")
def fiveticks():
# Get the page
page = int(request.args.get('page', 1))
collname = "fiveMinBars"
return process_quotes(collname, page)

@app.route("/minticks.json")
def ticks():
# Get the page
page = int(request.args.get('page', 1))
collname = "minbars"
return process_quotes(collname, page)

def process_quotes(collname, page):
limit = 150
skip = page
projection = { '_id': False}
quotes = [ format(x, 'Timestamp') for x in db[collname].find({},projection=projection).sort("Timestamp", ASCENDING)
.skip(skip).limit(limit)]
return JSONEncoder().encode(quotes)

@app.route("/money.json")
def money():

# Get the page
page = int(request.args.get('page', 1))
limit = 6
limit = 9
skip = page

candlesticks = db.command("aggregate", "money",
"""
candlesticks = db.command("aggregate", collname,
pipeline=[
{"$project": {
"minute": {
Expand Down Expand Up @@ -66,8 +101,30 @@ def money():
}
},
])["result"]
"""
pipeline = [
{ "$project": {
"bid": {"open": "$Open", "close": "$Close",
"high": "$High", "low": "$Low", "avg": {"$avg": ["$High", "$Low"]} },
"ts": "$Timestamp",
"_id": "$Timestamp"
}
},
{"$sort": {"ts":1}},
{"$skip": skip},
{"$limit": limit},
]
result = db.minbars.aggregate(pipeline)['result']

candlesticks = [format(x) for x in result]
print candlesticks
return JSONEncoder().encode(candlesticks)

def format(x, k='_id'):
x[k] = datetime.datetime.strptime(x[k], '%Y-%m-%d %H:%M')
if k != '_id':
x.pop('_id', None)
return x

class JSONEncoder(flask.json.JSONEncoder):
def default(self, obj):
Expand Down
97,611 changes: 97,611 additions & 0 deletions aggregation/candlesticks/spark/mstf.csv

Large diffs are not rendered by default.

27 changes: 27 additions & 0 deletions aggregation/candlesticks/spark/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
#!/bin/sh
# set your hadoop location here
export HADOOP_PREFIX=/Users/norberto/sandbox/sparktalk/lib/hadoop-2.6.0
# need this to run local spark-shell
export SPARK_LOCAL_IP=127.0.0.1
# spark install
export SPARK_HOME=/Users/norberto/sandbox/sparktalk/lib/spark-1.4.0-bin-hadoop2.6

export PATH=$PATH:$SPARK_HOME/bin
export dbname=fake
export collname=minbars
# import mstf.csv file through mongoimport
mongoimport -d $dbname -c minbars --type csv --headerline mstf.csv

# dependencies: we need to set the folder with the following jars in it
# casbah-commons_2.10-2.8.0.jar
# casbah-core_2.10-2.8.0.jar
# casbah-query_2.10-2.8.0.jar
# mongo-java-driver-2.13.0.jar
# spark-mongodb-core - http://spark-packages.org/package/Stratio/spark-mongodb
export dependencies_dir=/Users/norberto/sandbox/sparktalk/lib
export dependencies_jars=$dependencies_dir/spark-mongodb-core-0.8.7.jar,$dependencies_dir/mongodb/casbah-commons_2.10-2.8.2.jar,$dependencies_dir/mongodb/casbah-core_2.10-2.8.0.jar,$dependencies_dir/mongodb/casbah-query_2.10-2.8.0.jar,$dependencies_dir/mongodb/mongo-java-driver-2.13.0.jar
# echo some results
mongo --eval "db.$collname.find().limit(10).forEach(function(d){printjson(d)})" $dbname

# run spark-shell
spark-shell --jars $dependencies_jars -i script.scala
178 changes: 178 additions & 0 deletions aggregation/candlesticks/spark/script.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
//WE RUN THIS THROUGH spark-shell
//Import the relevant packages and classes
import com.mongodb.casbah.{WriteConcern => MongodbWriteConcern}
import com.stratio.provider._
import com.stratio.provider.mongodb._
import com.stratio.provider.mongodb.schema._
import com.stratio.provider.mongodb.writer._
import org.apache.spark.sql.hive.HiveContext
import MongodbConfig._

//Set the database where we going to fetch data and write data to.
val dbname = "BIGDATASpain"
//Configure which database and collection to read from, with optional parameters too
val mcInputBuilder = MongodbConfigBuilder(Map(Host -> List("localhost:27017"),
Database -> dbname,
Collection -> "minbars",
SamplingRatio -> 1.0,
WriteConcern -> MongodbWriteConcern.Normal))
val readConfig = mcInputBuilder.build()

//HiveContext uses Hive's SQL parser with a superset of features of SQLContext so I used that one
// See http://spark.apache.org/docs/1.4.0/sql-programming-guide.html#starting-point-sqlcontext for more info
val sqlContext = new HiveContext(sc) //sc is already defined as a SparkContext by the shell
val dfOneMin = sqlContext.fromMongoDB(readConfig) //set up the MongoDB collection to read from as a DataFrame
dfOneMin.registerTempTable("minbars") //make the table minbars available to the SQL expressions later


//This applies a SQL windowing functions to partition the 1-minute bars into 5-minute windows
// and then selects the open, high, low, & close price within each 5 minute window
val dfFiveMinForMonth = sqlContext.sql(
"""
SELECT m.Symbol, m.OpenTime as Timestamp, m.Open, m.High, m.Low, m.Close, m.Volume
FROM
(SELECT

Symbol,
FIRST_VALUE(Timestamp)
OVER (
PARTITION BY floor(unix_timestamp(Timestamp, 'yyyy-MM-dd HH:mm')/(5*60))
ORDER BY Timestamp)

as OpenTime,

LAST_VALUE(Timestamp)
OVER (
PARTITION BY floor(unix_timestamp(Timestamp, 'yyyy-MM-dd HH:mm')/(5*60))
ORDER BY Timestamp)

as CloseTime,

FIRST_VALUE(Open)
OVER (
PARTITION BY floor(unix_timestamp(Timestamp, 'yyyy-MM-dd HH:mm')/(5*60))
ORDER BY Timestamp)

as Open,
MAX(High)
OVER (
PARTITION BY floor(unix_timestamp(Timestamp, 'yyyy-MM-dd HH:mm')/(5*60))
ORDER BY Timestamp)

as High,

MIN(Low)
OVER (
PARTITION BY floor(unix_timestamp(Timestamp, 'yyyy-MM-dd HH:mm')/(5*60))
ORDER BY Timestamp)

as Low,
LAST_VALUE(Close)
OVER (
PARTITION BY floor(unix_timestamp(Timestamp, 'yyyy-MM-dd HH:mm')/(5*60))
ORDER BY Timestamp)

as Close,
SUM(Volume)
OVER (
PARTITION BY floor(unix_timestamp(Timestamp, 'yyyy-MM-dd HH:mm')/(5*60))
ORDER BY Timestamp)

as Volume
FROM minbars)
as m
WHERE unix_timestamp(m.CloseTime, 'yyyy-MM-dd HH:mm') - unix_timestamp(m.OpenTime, 'yyyy-MM-dd HH:mm') = 60*4"""
)

//Configure which table we want to write to in MongoDB
val fiveMinOutputBuilder = MongodbConfigBuilder(Map(Host -> List("localhost:27017"), Database -> dbname, Collection -> "fiveMinBars", SamplingRatio -> 1.0, WriteConcern -> MongodbWriteConcern.Normal, SplitKey -> "_id", SplitSize -> 8))
val writeConfig = fiveMinOutputBuilder.build()

//Write the data to MongoDB - because of Spark's just-in-time execution, this actually triggers running the query to read from the 1-minute bars table in MongoDB and then writing to the 5-minute bars table in MongoDB
dfFiveMinForMonth.saveToMongodb(writeConfig)




























//####################
val dfFiveMinForMonth = sqlContext.sql(
"""
SELECT m.Symbol, m.OpenTime as Timestamp, m.Open, m.High, m.Low, m.Close, m.Volume
FROM
(SELECT

Symbol,
FIRST_VALUE(Timestamp)
OVER (
PARTITION BY floor(unix_timestamp(Timestamp, 'yyyy-MM-dd HH:mm')/(5*60))
ORDER BY Timestamp)

as OpenTime,

LAST_VALUE(Timestamp)
OVER (
PARTITION BY floor(unix_timestamp(Timestamp, 'yyyy-MM-dd HH:mm')/(5*60))
ORDER BY Timestamp)

as CloseTime,

FIRST_VALUE(Open)
OVER (
PARTITION BY floor(unix_timestamp(Timestamp, 'yyyy-MM-dd HH:mm')/(5*60))
ORDER BY Timestamp)

as Open,
MAX(High)
OVER (
PARTITION BY floor(unix_timestamp(Timestamp, 'yyyy-MM-dd HH:mm')/(5*60))
ORDER BY Timestamp)

as High,

MIN(Low)
OVER (
PARTITION BY floor(unix_timestamp(Timestamp, 'yyyy-MM-dd HH:mm')/(5*60))
ORDER BY Timestamp)

as Low,
LAST_VALUE(Close)
OVER (
PARTITION BY floor(unix_timestamp(Timestamp, 'yyyy-MM-dd HH:mm')/(5*60))
ORDER BY Timestamp)

as Close,
SUM(Volume)
OVER (
PARTITION BY floor(unix_timestamp(Timestamp, 'yyyy-MM-dd HH:mm')/(5*60))
ORDER BY Timestamp)

as Volume
FROM minbars
WHERE Timestamp >= '2010-07-01' AND Timestamp < '2010-08-01')
as m
WHERE unix_timestamp(m.CloseTime, 'yyyy-MM-dd HH:mm') - unix_timestamp(m.OpenTime, 'yyyy-MM-dd HH:mm') = 60*4"""
)
53 changes: 53 additions & 0 deletions aggregation/candlesticks/static/css/fs.css
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
body {
font: 10px sans-serif;
}

.axis path,
.axis line {
fill: none;
stroke: #000;
shape-rendering: crispEdges;
}

path {
fill: none;
stroke-width: 1;
}

path.ohlc {
stroke: #000000;
stroke-width: 1;
}

path.ohlc.up {
stroke: #00AA00;
}

path.ohlc.down {
stroke: #FF0000;
}

.ma-0 path.line {
stroke: #1f77b4;
}

.ma-1 path.line {
stroke: #aec7e8;
}

path.volume {
fill: #EEEEEE;
}

.crosshair {
cursor: crosshair;
}

.crosshair path.wire {
stroke: #DDDDDD;
stroke-dasharray: 1, 1;
}

.crosshair .axisannotation path {
fill: #DDDDDD;
}
Loading