@@ -34,7 +34,7 @@ class CachedTableSuite extends QueryTest with SharedSQLContext {
3434 import testImplicits ._
3535
3636 def rddIdOf (tableName : String ): Int = {
37- val executedPlan = ctx .table(tableName).queryExecution.executedPlan
37+ val executedPlan = sqlContext .table(tableName).queryExecution.executedPlan
3838 executedPlan.collect {
3939 case InMemoryColumnarTableScan (_, _, relation) =>
4040 relation.cachedColumnBuffers.id
@@ -44,7 +44,7 @@ class CachedTableSuite extends QueryTest with SharedSQLContext {
4444 }
4545
4646 def isMaterialized (rddId : Int ): Boolean = {
47- ctx. sparkContext.env.blockManager.get(RDDBlockId (rddId, 0 )).nonEmpty
47+ sparkContext.env.blockManager.get(RDDBlockId (rddId, 0 )).nonEmpty
4848 }
4949
5050 test(" withColumn doesn't invalidate cached dataframe" ) {
@@ -69,153 +69,153 @@ class CachedTableSuite extends QueryTest with SharedSQLContext {
6969 test(" cache temp table" ) {
7070 testData.select(' key ).registerTempTable(" tempTable" )
7171 assertCached(sql(" SELECT COUNT(*) FROM tempTable" ), 0 )
72- ctx .cacheTable(" tempTable" )
72+ sqlContext .cacheTable(" tempTable" )
7373 assertCached(sql(" SELECT COUNT(*) FROM tempTable" ))
74- ctx .uncacheTable(" tempTable" )
74+ sqlContext .uncacheTable(" tempTable" )
7575 }
7676
7777 test(" unpersist an uncached table will not raise exception" ) {
78- assert(None == ctx .cacheManager.lookupCachedData(testData))
78+ assert(None == sqlContext .cacheManager.lookupCachedData(testData))
7979 testData.unpersist(blocking = true )
80- assert(None == ctx .cacheManager.lookupCachedData(testData))
80+ assert(None == sqlContext .cacheManager.lookupCachedData(testData))
8181 testData.unpersist(blocking = false )
82- assert(None == ctx .cacheManager.lookupCachedData(testData))
82+ assert(None == sqlContext .cacheManager.lookupCachedData(testData))
8383 testData.persist()
84- assert(None != ctx .cacheManager.lookupCachedData(testData))
84+ assert(None != sqlContext .cacheManager.lookupCachedData(testData))
8585 testData.unpersist(blocking = true )
86- assert(None == ctx .cacheManager.lookupCachedData(testData))
86+ assert(None == sqlContext .cacheManager.lookupCachedData(testData))
8787 testData.unpersist(blocking = false )
88- assert(None == ctx .cacheManager.lookupCachedData(testData))
88+ assert(None == sqlContext .cacheManager.lookupCachedData(testData))
8989 }
9090
9191 test(" cache table as select" ) {
9292 sql(" CACHE TABLE tempTable AS SELECT key FROM testData" )
9393 assertCached(sql(" SELECT COUNT(*) FROM tempTable" ))
94- ctx .uncacheTable(" tempTable" )
94+ sqlContext .uncacheTable(" tempTable" )
9595 }
9696
9797 test(" uncaching temp table" ) {
9898 testData.select(' key ).registerTempTable(" tempTable1" )
9999 testData.select(' key ).registerTempTable(" tempTable2" )
100- ctx .cacheTable(" tempTable1" )
100+ sqlContext .cacheTable(" tempTable1" )
101101
102102 assertCached(sql(" SELECT COUNT(*) FROM tempTable1" ))
103103 assertCached(sql(" SELECT COUNT(*) FROM tempTable2" ))
104104
105105 // Is this valid?
106- ctx .uncacheTable(" tempTable2" )
106+ sqlContext .uncacheTable(" tempTable2" )
107107
108108 // Should this be cached?
109109 assertCached(sql(" SELECT COUNT(*) FROM tempTable1" ), 0 )
110110 }
111111
112112 test(" too big for memory" ) {
113113 val data = " *" * 1000
114- ctx. sparkContext.parallelize(1 to 200000 , 1 ).map(_ => BigData (data)).toDF()
114+ sparkContext.parallelize(1 to 200000 , 1 ).map(_ => BigData (data)).toDF()
115115 .registerTempTable(" bigData" )
116- ctx .table(" bigData" ).persist(StorageLevel .MEMORY_AND_DISK )
117- assert(ctx .table(" bigData" ).count() === 200000L )
118- ctx .table(" bigData" ).unpersist(blocking = true )
116+ sqlContext .table(" bigData" ).persist(StorageLevel .MEMORY_AND_DISK )
117+ assert(sqlContext .table(" bigData" ).count() === 200000L )
118+ sqlContext .table(" bigData" ).unpersist(blocking = true )
119119 }
120120
121121 test(" calling .cache() should use in-memory columnar caching" ) {
122- ctx .table(" testData" ).cache()
123- assertCached(ctx .table(" testData" ))
124- ctx .table(" testData" ).unpersist(blocking = true )
122+ sqlContext .table(" testData" ).cache()
123+ assertCached(sqlContext .table(" testData" ))
124+ sqlContext .table(" testData" ).unpersist(blocking = true )
125125 }
126126
127127 test(" calling .unpersist() should drop in-memory columnar cache" ) {
128- ctx .table(" testData" ).cache()
129- ctx .table(" testData" ).count()
130- ctx .table(" testData" ).unpersist(blocking = true )
131- assertCached(ctx .table(" testData" ), 0 )
128+ sqlContext .table(" testData" ).cache()
129+ sqlContext .table(" testData" ).count()
130+ sqlContext .table(" testData" ).unpersist(blocking = true )
131+ assertCached(sqlContext .table(" testData" ), 0 )
132132 }
133133
134134 test(" isCached" ) {
135- ctx .cacheTable(" testData" )
135+ sqlContext .cacheTable(" testData" )
136136
137- assertCached(ctx .table(" testData" ))
138- assert(ctx .table(" testData" ).queryExecution.withCachedData match {
137+ assertCached(sqlContext .table(" testData" ))
138+ assert(sqlContext .table(" testData" ).queryExecution.withCachedData match {
139139 case _ : InMemoryRelation => true
140140 case _ => false
141141 })
142142
143- ctx .uncacheTable(" testData" )
144- assert(! ctx .isCached(" testData" ))
145- assert(ctx .table(" testData" ).queryExecution.withCachedData match {
143+ sqlContext .uncacheTable(" testData" )
144+ assert(! sqlContext .isCached(" testData" ))
145+ assert(sqlContext .table(" testData" ).queryExecution.withCachedData match {
146146 case _ : InMemoryRelation => false
147147 case _ => true
148148 })
149149 }
150150
151151 test(" SPARK-1669: cacheTable should be idempotent" ) {
152- assume(! ctx .table(" testData" ).logicalPlan.isInstanceOf [InMemoryRelation ])
152+ assume(! sqlContext .table(" testData" ).logicalPlan.isInstanceOf [InMemoryRelation ])
153153
154- ctx .cacheTable(" testData" )
155- assertCached(ctx .table(" testData" ))
154+ sqlContext .cacheTable(" testData" )
155+ assertCached(sqlContext .table(" testData" ))
156156
157157 assertResult(1 , " InMemoryRelation not found, testData should have been cached" ) {
158- ctx .table(" testData" ).queryExecution.withCachedData.collect {
158+ sqlContext .table(" testData" ).queryExecution.withCachedData.collect {
159159 case r : InMemoryRelation => r
160160 }.size
161161 }
162162
163- ctx .cacheTable(" testData" )
163+ sqlContext .cacheTable(" testData" )
164164 assertResult(0 , " Double InMemoryRelations found, cacheTable() is not idempotent" ) {
165- ctx .table(" testData" ).queryExecution.withCachedData.collect {
165+ sqlContext .table(" testData" ).queryExecution.withCachedData.collect {
166166 case r @ InMemoryRelation (_, _, _, _, _ : InMemoryColumnarTableScan , _) => r
167167 }.size
168168 }
169169
170- ctx .uncacheTable(" testData" )
170+ sqlContext .uncacheTable(" testData" )
171171 }
172172
173173 test(" read from cached table and uncache" ) {
174- ctx .cacheTable(" testData" )
175- checkAnswer(ctx .table(" testData" ), testData.collect().toSeq)
176- assertCached(ctx .table(" testData" ))
174+ sqlContext .cacheTable(" testData" )
175+ checkAnswer(sqlContext .table(" testData" ), testData.collect().toSeq)
176+ assertCached(sqlContext .table(" testData" ))
177177
178- ctx .uncacheTable(" testData" )
179- checkAnswer(ctx .table(" testData" ), testData.collect().toSeq)
180- assertCached(ctx .table(" testData" ), 0 )
178+ sqlContext .uncacheTable(" testData" )
179+ checkAnswer(sqlContext .table(" testData" ), testData.collect().toSeq)
180+ assertCached(sqlContext .table(" testData" ), 0 )
181181 }
182182
183183 test(" correct error on uncache of non-cached table" ) {
184184 intercept[IllegalArgumentException ] {
185- ctx .uncacheTable(" testData" )
185+ sqlContext .uncacheTable(" testData" )
186186 }
187187 }
188188
189189 test(" SELECT star from cached table" ) {
190190 sql(" SELECT * FROM testData" ).registerTempTable(" selectStar" )
191- ctx .cacheTable(" selectStar" )
191+ sqlContext .cacheTable(" selectStar" )
192192 checkAnswer(
193193 sql(" SELECT * FROM selectStar WHERE key = 1" ),
194194 Seq (Row (1 , " 1" )))
195- ctx .uncacheTable(" selectStar" )
195+ sqlContext .uncacheTable(" selectStar" )
196196 }
197197
198198 test(" Self-join cached" ) {
199199 val unCachedAnswer =
200200 sql(" SELECT * FROM testData a JOIN testData b ON a.key = b.key" ).collect()
201- ctx .cacheTable(" testData" )
201+ sqlContext .cacheTable(" testData" )
202202 checkAnswer(
203203 sql(" SELECT * FROM testData a JOIN testData b ON a.key = b.key" ),
204204 unCachedAnswer.toSeq)
205- ctx .uncacheTable(" testData" )
205+ sqlContext .uncacheTable(" testData" )
206206 }
207207
208208 test(" 'CACHE TABLE' and 'UNCACHE TABLE' SQL statement" ) {
209209 sql(" CACHE TABLE testData" )
210- assertCached(ctx .table(" testData" ))
210+ assertCached(sqlContext .table(" testData" ))
211211
212212 val rddId = rddIdOf(" testData" )
213213 assert(
214214 isMaterialized(rddId),
215215 " Eagerly cached in-memory table should have already been materialized" )
216216
217217 sql(" UNCACHE TABLE testData" )
218- assert(! ctx .isCached(" testData" ), " Table 'testData' should not be cached" )
218+ assert(! sqlContext .isCached(" testData" ), " Table 'testData' should not be cached" )
219219
220220 eventually(timeout(10 seconds)) {
221221 assert(! isMaterialized(rddId), " Uncached in-memory table should have been unpersisted" )
@@ -224,37 +224,37 @@ class CachedTableSuite extends QueryTest with SharedSQLContext {
224224
225225 test(" CACHE TABLE tableName AS SELECT * FROM anotherTable" ) {
226226 sql(" CACHE TABLE testCacheTable AS SELECT * FROM testData" )
227- assertCached(ctx .table(" testCacheTable" ))
227+ assertCached(sqlContext .table(" testCacheTable" ))
228228
229229 val rddId = rddIdOf(" testCacheTable" )
230230 assert(
231231 isMaterialized(rddId),
232232 " Eagerly cached in-memory table should have already been materialized" )
233233
234- ctx .uncacheTable(" testCacheTable" )
234+ sqlContext .uncacheTable(" testCacheTable" )
235235 eventually(timeout(10 seconds)) {
236236 assert(! isMaterialized(rddId), " Uncached in-memory table should have been unpersisted" )
237237 }
238238 }
239239
240240 test(" CACHE TABLE tableName AS SELECT ..." ) {
241241 sql(" CACHE TABLE testCacheTable AS SELECT key FROM testData LIMIT 10" )
242- assertCached(ctx .table(" testCacheTable" ))
242+ assertCached(sqlContext .table(" testCacheTable" ))
243243
244244 val rddId = rddIdOf(" testCacheTable" )
245245 assert(
246246 isMaterialized(rddId),
247247 " Eagerly cached in-memory table should have already been materialized" )
248248
249- ctx .uncacheTable(" testCacheTable" )
249+ sqlContext .uncacheTable(" testCacheTable" )
250250 eventually(timeout(10 seconds)) {
251251 assert(! isMaterialized(rddId), " Uncached in-memory table should have been unpersisted" )
252252 }
253253 }
254254
255255 test(" CACHE LAZY TABLE tableName" ) {
256256 sql(" CACHE LAZY TABLE testData" )
257- assertCached(ctx .table(" testData" ))
257+ assertCached(sqlContext .table(" testData" ))
258258
259259 val rddId = rddIdOf(" testData" )
260260 assert(
@@ -266,15 +266,15 @@ class CachedTableSuite extends QueryTest with SharedSQLContext {
266266 isMaterialized(rddId),
267267 " Lazily cached in-memory table should have been materialized" )
268268
269- ctx .uncacheTable(" testData" )
269+ sqlContext .uncacheTable(" testData" )
270270 eventually(timeout(10 seconds)) {
271271 assert(! isMaterialized(rddId), " Uncached in-memory table should have been unpersisted" )
272272 }
273273 }
274274
275275 test(" InMemoryRelation statistics" ) {
276276 sql(" CACHE TABLE testData" )
277- ctx .table(" testData" ).queryExecution.withCachedData.collect {
277+ sqlContext .table(" testData" ).queryExecution.withCachedData.collect {
278278 case cached : InMemoryRelation =>
279279 val actualSizeInBytes = (1 to 100 ).map(i => INT .defaultSize + i.toString.length + 4 ).sum
280280 assert(cached.statistics.sizeInBytes === actualSizeInBytes)
@@ -283,46 +283,48 @@ class CachedTableSuite extends QueryTest with SharedSQLContext {
283283
284284 test(" Drops temporary table" ) {
285285 testData.select(' key ).registerTempTable(" t1" )
286- ctx.table(" t1" )
287- ctx.dropTempTable(" t1" )
288- assert(intercept[RuntimeException ](ctx.table(" t1" )).getMessage.startsWith(" Table Not Found" ))
286+ sqlContext.table(" t1" )
287+ sqlContext.dropTempTable(" t1" )
288+ assert(
289+ intercept[RuntimeException ](sqlContext.table(" t1" )).getMessage.startsWith(" Table Not Found" ))
289290 }
290291
291292 test(" Drops cached temporary table" ) {
292293 testData.select(' key ).registerTempTable(" t1" )
293294 testData.select(' key ).registerTempTable(" t2" )
294- ctx .cacheTable(" t1" )
295+ sqlContext .cacheTable(" t1" )
295296
296- assert(ctx .isCached(" t1" ))
297- assert(ctx .isCached(" t2" ))
297+ assert(sqlContext .isCached(" t1" ))
298+ assert(sqlContext .isCached(" t2" ))
298299
299- ctx.dropTempTable(" t1" )
300- assert(intercept[RuntimeException ](ctx.table(" t1" )).getMessage.startsWith(" Table Not Found" ))
301- assert(! ctx.isCached(" t2" ))
300+ sqlContext.dropTempTable(" t1" )
301+ assert(
302+ intercept[RuntimeException ](sqlContext.table(" t1" )).getMessage.startsWith(" Table Not Found" ))
303+ assert(! sqlContext.isCached(" t2" ))
302304 }
303305
304306 test(" Clear all cache" ) {
305307 sql(" SELECT key FROM testData LIMIT 10" ).registerTempTable(" t1" )
306308 sql(" SELECT key FROM testData LIMIT 5" ).registerTempTable(" t2" )
307- ctx .cacheTable(" t1" )
308- ctx .cacheTable(" t2" )
309- ctx .clearCache()
310- assert(ctx .cacheManager.isEmpty)
309+ sqlContext .cacheTable(" t1" )
310+ sqlContext .cacheTable(" t2" )
311+ sqlContext .clearCache()
312+ assert(sqlContext .cacheManager.isEmpty)
311313
312314 sql(" SELECT key FROM testData LIMIT 10" ).registerTempTable(" t1" )
313315 sql(" SELECT key FROM testData LIMIT 5" ).registerTempTable(" t2" )
314- ctx .cacheTable(" t1" )
315- ctx .cacheTable(" t2" )
316+ sqlContext .cacheTable(" t1" )
317+ sqlContext .cacheTable(" t2" )
316318 sql(" Clear CACHE" )
317- assert(ctx .cacheManager.isEmpty)
319+ assert(sqlContext .cacheManager.isEmpty)
318320 }
319321
320322 test(" Clear accumulators when uncacheTable to prevent memory leaking" ) {
321323 sql(" SELECT key FROM testData LIMIT 10" ).registerTempTable(" t1" )
322324 sql(" SELECT key FROM testData LIMIT 5" ).registerTempTable(" t2" )
323325
324- ctx .cacheTable(" t1" )
325- ctx .cacheTable(" t2" )
326+ sqlContext .cacheTable(" t1" )
327+ sqlContext .cacheTable(" t2" )
326328
327329 sql(" SELECT * FROM t1" ).count()
328330 sql(" SELECT * FROM t2" ).count()
@@ -331,8 +333,8 @@ class CachedTableSuite extends QueryTest with SharedSQLContext {
331333
332334 Accumulators .synchronized {
333335 val accsSize = Accumulators .originals.size
334- ctx .uncacheTable(" t1" )
335- ctx .uncacheTable(" t2" )
336+ sqlContext .uncacheTable(" t1" )
337+ sqlContext .uncacheTable(" t2" )
336338 assert((accsSize - 2 ) == Accumulators .originals.size)
337339 }
338340 }
0 commit comments