@@ -50,7 +50,7 @@ case class InsertIntoHiveTable(
5050 @ transient val sc : HiveContext = sqlContext.asInstanceOf [HiveContext ]
5151 @ transient lazy val outputClass = newSerializer(table.tableDesc).getSerializedClass
5252 @ transient private lazy val hiveContext = new Context (sc.hiveconf)
53- @ transient private lazy val db = Hive .get( sc.hiveconf)
53+ @ transient private lazy val catalog = sc.catalog
5454
5555 private def newSerializer (tableDesc : TableDesc ): Serializer = {
5656 val serializer = tableDesc.getDeserializerClass.newInstance().asInstanceOf [Serializer ]
@@ -199,38 +199,45 @@ case class InsertIntoHiveTable(
199199 orderedPartitionSpec.put(entry.getName,partitionSpec.get(entry.getName).getOrElse(" " ))
200200 }
201201 val partVals = MetaStoreUtils .getPvals(table.hiveQlTable.getPartCols, partitionSpec)
202- db.validatePartitionNameCharacters(partVals)
202+ catalog.synchronized {
203+ catalog.client.validatePartitionNameCharacters(partVals)
204+ }
203205 // inheritTableSpecs is set to true. It should be set to false for a IMPORT query
204206 // which is currently considered as a Hive native command.
205207 val inheritTableSpecs = true
206208 // TODO: Correctly set isSkewedStoreAsSubdir.
207209 val isSkewedStoreAsSubdir = false
208210 if (numDynamicPartitions > 0 ) {
209- db.loadDynamicPartitions(
210- outputPath,
211- qualifiedTableName,
212- orderedPartitionSpec,
213- overwrite,
214- numDynamicPartitions,
215- holdDDLTime,
216- isSkewedStoreAsSubdir
217- )
211+ catalog.synchronized {
212+ catalog.client.loadDynamicPartitions(
213+ outputPath,
214+ qualifiedTableName,
215+ orderedPartitionSpec,
216+ overwrite,
217+ numDynamicPartitions,
218+ holdDDLTime,
219+ isSkewedStoreAsSubdir
220+ }
218221 } else {
219- db.loadPartition(
222+ catalog.synchronized {
223+ catalog.client.loadPartition(
224+ outputPath,
225+ qualifiedTableName,
226+ orderedPartitionSpec,
227+ overwrite,
228+ holdDDLTime,
229+ inheritTableSpecs,
230+ isSkewedStoreAsSubdir)
231+ }
232+ }
233+ } else {
234+ catalog.synchronized {
235+ catalog.client.loadTable(
220236 outputPath,
221237 qualifiedTableName,
222- orderedPartitionSpec,
223238 overwrite,
224- holdDDLTime,
225- inheritTableSpecs,
226- isSkewedStoreAsSubdir)
239+ holdDDLTime)
227240 }
228- } else {
229- db.loadTable(
230- outputPath,
231- qualifiedTableName,
232- overwrite,
233- holdDDLTime)
234241 }
235242
236243 // Invalidate the cache.
0 commit comments