diff --git a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatConstants.java b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatConstants.java index c42e9936f47a..d14a48653ec4 100644 --- a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatConstants.java +++ b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatConstants.java @@ -148,7 +148,6 @@ private HCatConstants() { // restrict instantiation public static final String HCAT_DYNAMIC_PTN_JOBID = HCAT_KEY_OUTPUT_BASE + ".dynamic.jobid"; public static final boolean HCAT_IS_DYNAMIC_MAX_PTN_CHECK_ENABLED = false; - public static final String HCAT_DYNAMIC_CUSTOM_PATTERN = "hcat.dynamic.partitioning.custom.pattern"; // Message Bus related properties. public static final String HCAT_DEFAULT_TOPIC_PREFIX = "hcat"; diff --git a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatFileUtil.java b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatFileUtil.java index 5532b753c4d6..e3035afdddf4 100644 --- a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatFileUtil.java +++ b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatFileUtil.java @@ -23,16 +23,12 @@ import java.util.HashSet; import java.util.Map; import java.util.regex.Matcher; -import java.util.regex.Pattern; import org.apache.hadoop.fs.Path; -public class HCatFileUtil { - - // regex of the form: ${column name}. Following characters are not allowed in column name: - // whitespace characters, /, {, }, \ - private static final Pattern customPathPattern = Pattern.compile("(\\$\\{)([^\\s/\\{\\}\\\\]+)(\\})"); +import static org.apache.hadoop.hive.metastore.utils.DynamicPartitioningCustomPattern.customPathPattern; +public class HCatFileUtil { // This method parses the custom dynamic path and replaces each occurrence // of column name within regex pattern with its corresponding value, if provided public static String resolveCustomPath(OutputJobInfo jobInfo, diff --git a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatOutputFormat.java b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatOutputFormat.java index 9aaf67e40d84..76e62002567d 100644 --- a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatOutputFormat.java +++ b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatOutputFormat.java @@ -53,6 +53,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils.HCAT_CUSTOM_DYNAMIC_PATTERN; + /** The OutputFormat to use to write data to HCatalog. The key value is ignored and * should be given as null. The value is the HCatRecord to write.*/ @InterfaceAudience.Public @@ -163,7 +165,7 @@ public static void setOutput(Configuration conf, Credentials credentials, conf.set(HCatConstants.HCAT_DYNAMIC_PTN_JOBID, dynHash); // if custom pattern is set in case of dynamic partitioning, configure custom path - String customPattern = conf.get(HCatConstants.HCAT_DYNAMIC_CUSTOM_PATTERN); + String customPattern = conf.get(HCAT_CUSTOM_DYNAMIC_PATTERN); if (customPattern != null) { HCatFileUtil.setCustomPath(customPattern, outputJobInfo); } diff --git a/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatDynamicPartitioned.java b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatDynamicPartitioned.java index 7bd750a9d5f7..eab851f27f2f 100644 --- a/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatDynamicPartitioned.java +++ b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatDynamicPartitioned.java @@ -46,6 +46,7 @@ import static junit.framework.Assert.assertEquals; import static junit.framework.Assert.assertTrue; +import static org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils.HCAT_CUSTOM_DYNAMIC_PATTERN; public class TestHCatDynamicPartitioned extends HCatMapReduceTest { @@ -125,7 +126,7 @@ protected void runHCatDynamicPartitionedTable(boolean asSingleMapTask, generateWriteRecords(NUM_RECORDS, NUM_TOP_PARTITIONS, 0); HashMap properties = new HashMap(); if (customDynamicPathPattern != null) { - properties.put(HCatConstants.HCAT_DYNAMIC_CUSTOM_PATTERN, customDynamicPathPattern); + properties.put(HCAT_CUSTOM_DYNAMIC_PATTERN, customDynamicPathPattern); } runMRCreate(null, dataColumns, writeRecords.subList(0,NUM_RECORDS/2), NUM_RECORDS/2, true, asSingleMapTask, properties); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/partition/add/AbstractAddPartitionAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/partition/add/AbstractAddPartitionAnalyzer.java index 4d73782aaee4..68a11cf178e4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/partition/add/AbstractAddPartitionAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/partition/add/AbstractAddPartitionAnalyzer.java @@ -42,6 +42,8 @@ import org.apache.hadoop.hive.ql.parse.HiveParser; import org.apache.hadoop.hive.ql.parse.SemanticException; +import static org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils.HCAT_CUSTOM_DYNAMIC_PATTERN; + /** * Analyzer for add partition commands. */ @@ -70,7 +72,7 @@ protected void analyzeCommand(TableName tableName, Map partition } AlterTableAddPartitionDesc desc = new AlterTableAddPartitionDesc(table.getDbName(), table.getTableName(), - ifNotExists, partitions); + ifNotExists, partitions, conf.get(HCAT_CUSTOM_DYNAMIC_PATTERN)); Task ddlTask = TaskFactory.get(new DDLWork(getInputs(), getOutputs(), desc)); rootTasks.add(ddlTask); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/partition/add/AlterTableAddPartitionDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/partition/add/AlterTableAddPartitionDesc.java index 231e3f14fcf5..de8f698709da 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/partition/add/AlterTableAddPartitionDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/partition/add/AlterTableAddPartitionDesc.java @@ -178,12 +178,20 @@ public long getWriteId() { private ReplicationSpec replicationSpec = null; // TODO: make replicationSpec final too + private final String customPattern; + + public AlterTableAddPartitionDesc(String dbName, String tableName, boolean ifNotExists, + List partitions) { + this(dbName, tableName, ifNotExists, partitions, null); + } + public AlterTableAddPartitionDesc(String dbName, String tableName, boolean ifNotExists, - List partitions) { + List partitions, String customPattern) { this.dbName = dbName; this.tableName = tableName; this.ifNotExists = ifNotExists; this.partitions = partitions; + this.customPattern = customPattern; } @Explain(displayName = "db name", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) @@ -241,4 +249,8 @@ public boolean mayNeedWriteId() { return true; } + public String getCustomPattern() { + return this.customPattern; + } + } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/partition/add/AlterTableAddPartitionOperation.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/partition/add/AlterTableAddPartitionOperation.java index edfc9de7f190..2aed7301e564 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/partition/add/AlterTableAddPartitionOperation.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/partition/add/AlterTableAddPartitionOperation.java @@ -80,7 +80,7 @@ private long getWriteId(Table table) throws LockException { private List getPartitions(Table table, long writeId) throws HiveException { List partitions = new ArrayList<>(desc.getPartitions().size()); for (AlterTableAddPartitionDesc.PartitionDesc partitionDesc : desc.getPartitions()) { - Partition partition = convertPartitionSpecToMetaPartition(table, partitionDesc); + Partition partition = convertPartitionSpecToMetaPartition(table, partitionDesc, desc.getCustomPattern()); if (partition != null && writeId > 0) { partition.setWriteId(writeId); } @@ -91,7 +91,7 @@ private List getPartitions(Table table, long writeId) throws HiveExce } private Partition convertPartitionSpecToMetaPartition(Table table, - AlterTableAddPartitionDesc.PartitionDesc partitionSpec) throws HiveException { + AlterTableAddPartitionDesc.PartitionDesc partitionSpec, String customPattern) throws HiveException { Path location = partitionSpec.getLocation() != null ? new Path(table.getPath(), partitionSpec.getLocation()) : null; if (location != null) { // Ensure that it is a full qualified path (in most cases it will be since tbl.getPath() is full qualified) @@ -99,7 +99,7 @@ private Partition convertPartitionSpecToMetaPartition(Table table, } Partition partition = org.apache.hadoop.hive.ql.metadata.Partition.createMetaPartitionObject( - table, partitionSpec.getPartSpec(), location); + table, partitionSpec.getPartSpec(), location, customPattern); if (partitionSpec.getPartParams() != null) { partition.setParameters(partitionSpec.getPartParams()); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java index a141aa2c5381..03bdebbfcbe3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java @@ -45,6 +45,7 @@ import java.util.List; import java.util.Map; +import static org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils.HCAT_CUSTOM_DYNAMIC_PATTERN; public class FSTableEvent implements TableEvent { private final Path fromPathMetadata; @@ -179,7 +180,7 @@ private AlterTableAddPartitionDesc addPartitionDesc(Path fromPath, ImportTableDe sd.getBucketCols(), sd.getSortCols(), columnStatistics, writeId); AlterTableAddPartitionDesc addPartitionDesc = new AlterTableAddPartitionDesc(tblDesc.getDatabaseName(), - tblDesc.getTableName(), true, ImmutableList.of(partitionDesc)); + tblDesc.getTableName(), true, ImmutableList.of(partitionDesc), hiveConf.get(HCAT_CUSTOM_DYNAMIC_PATTERN)); addPartitionDesc.setReplicationSpec(replicationSpec()); return addPartitionDesc; } catch (Exception e) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Partition.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Partition.java index 736e6e8c9f1a..8418df76601e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Partition.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Partition.java @@ -28,6 +28,7 @@ import java.util.Map; import java.util.Objects; import java.util.Properties; +import java.util.regex.Matcher; import org.apache.hadoop.hive.common.StringInternUtils; import org.apache.hadoop.hive.metastore.HiveMetaStoreUtils; @@ -51,6 +52,8 @@ import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapred.OutputFormat; +import static org.apache.hadoop.hive.metastore.utils.DynamicPartitioningCustomPattern.customPathPattern; + /** * A Hive Table Partition: is a fundamental storage unit within a Table. * @@ -119,11 +122,16 @@ public Partition(Table tbl, org.apache.hadoop.hive.metastore.api.Partition tp) * Thrown if we could not create the partition. */ public Partition(Table tbl, Map partSpec, Path location) throws HiveException { - initialize(tbl, createMetaPartitionObject(tbl, partSpec, location)); + initialize(tbl, createMetaPartitionObject(tbl, partSpec, location, null)); + } + + public static org.apache.hadoop.hive.metastore.api.Partition createMetaPartitionObject( + Table tbl, Map partSpec, Path location) throws HiveException { + return createMetaPartitionObject(tbl, partSpec, location, null); } public static org.apache.hadoop.hive.metastore.api.Partition createMetaPartitionObject( - Table tbl, Map partSpec, Path location) throws HiveException { + Table tbl, Map partSpec, Path location, String customPattern) throws HiveException { List pvals = new ArrayList(); for (FieldSchema field : tbl.getPartCols()) { String val = partSpec.get(field.getName()); @@ -142,6 +150,15 @@ public static org.apache.hadoop.hive.metastore.api.Partition createMetaPartition if (!tbl.isView()) { tpart.setSd(tbl.getSd().deepCopy()); + if (location == null && customPattern != null) { //should only happen for msck repair table with custom pattern + Matcher m = customPathPattern.matcher(customPattern); + StringBuffer sb = new StringBuffer(); + while (m.find()) { + m.appendReplacement(sb, partSpec.get(m.group(2))); + } + m.appendTail(sb); + location = new Path(tbl.getDataLocation(), sb.toString()); + } tpart.getSd().setLocation((location != null) ? location.toString() : null); } return tpart; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java index 4d4956fbec13..ab6620f46f83 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java @@ -36,6 +36,7 @@ import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.ReplChangeManager; +import org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils; import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.ErrorMsg; @@ -414,7 +415,8 @@ private static AlterTableAddPartitionDesc getBaseAddPartitionDescFromPartition(P partitionSpec, location, partition.getParameters(), sd.getInputFormat(), sd.getOutputFormat(), sd.getNumBuckets(), sd.getCols(), sd.getSerdeInfo().getSerializationLib(), sd.getSerdeInfo().getParameters(), sd.getBucketCols(), sd.getSortCols(), null, writeId); - return new AlterTableAddPartitionDesc(dbName, tblDesc.getTableName(), true, ImmutableList.of(partitionDesc)); + return new AlterTableAddPartitionDesc(dbName, tblDesc.getTableName(), true, ImmutableList.of(partitionDesc), + conf.get(MetaStoreServerUtils.HCAT_CUSTOM_DYNAMIC_PATTERN)); } private static ImportTableDesc getBaseCreateTableDescFromTable(String dbName, diff --git a/ql/src/test/org/apache/hadoop/hive/ql/metadata/TestHiveMetaStoreChecker.java b/ql/src/test/org/apache/hadoop/hive/ql/metadata/TestHiveMetaStoreChecker.java index a6188e1ad97e..7bed2ef10396 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/metadata/TestHiveMetaStoreChecker.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/metadata/TestHiveMetaStoreChecker.java @@ -36,6 +36,7 @@ import org.apache.hadoop.hive.metastore.HiveMetaStoreChecker; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.api.AlreadyExistsException; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.FieldSchema; @@ -56,6 +57,8 @@ import com.google.common.collect.Lists; +import static org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils.HCAT_CUSTOM_DYNAMIC_PATTERN; + /** * TestHiveMetaStoreChecker. * @@ -66,15 +69,22 @@ public class TestHiveMetaStoreChecker { private IMetaStoreClient msc; private FileSystem fs; private HiveMetaStoreChecker checker = null; + private HiveMetaStoreChecker customChecker = null; private final String catName = "hive"; private final String dbName = "testhivemetastorechecker_db"; private final String tableName = "testhivemetastorechecker_table"; + private final String externalTableName = "testhivemetastorechecker_table_external"; private final String partDateName = "partdate"; private final String partCityName = "partcity"; + private final String partIdName = "partid"; + + private final String partStateName = "partstate"; + private List partCols; + private List partColsExt; private List> parts; @Before @@ -92,10 +102,19 @@ public void setUp() throws Exception { SessionState ss = SessionState.start(conf); ss.initTxnMgr(conf); + HiveConf customConf = new HiveConf(conf); + customConf.set(HCAT_CUSTOM_DYNAMIC_PATTERN, "const/${partstate}/const2/${partid}-${partcity}"); + customChecker = new HiveMetaStoreChecker(msc, customConf); + partCols = new ArrayList<>(); partCols.add(new FieldSchema(partDateName, serdeConstants.STRING_TYPE_NAME, "")); partCols.add(new FieldSchema(partCityName, serdeConstants.STRING_TYPE_NAME, "")); + partColsExt = new ArrayList(); + partColsExt.add(new FieldSchema(partCityName, serdeConstants.STRING_TYPE_NAME, "")); + partColsExt.add(new FieldSchema(partStateName, serdeConstants.STRING_TYPE_NAME, "")); + partColsExt.add(new FieldSchema(partIdName, serdeConstants.STRING_TYPE_NAME, "")); + parts = new ArrayList<>(); Map part1 = new HashMap<>(); part1.put(partDateName, "2008-01-01"); @@ -342,6 +361,33 @@ public void testAddPartitionCompactedBase() throws Exception { assertEquals(200, result.getPartitionsNotInMs().iterator().next().getMaxTxnId()); } + @Test + public void testCustomPartitionPatternCheck() + throws HiveException, AlreadyExistsException, IOException, MetastoreException { + Table table = createTestExternalTable(); + Path tablePath = table.getDataLocation(); + fs = tablePath.getFileSystem(hive.getConf()); + fs.mkdirs(new Path(tablePath, "const/CO/const2/1-denver")); + fs.mkdirs(new Path(tablePath, "const/CA/const2/1-sanjose")); + fs.mkdirs(new Path(tablePath, "const/CA/const2/2-sanfrancisco")); + fs.mkdirs(new Path(tablePath, "const/IL/const2/1-chicago")); + fs.mkdirs(new Path(tablePath, "const/TX/const2/1-dallas")); + fs.mkdirs(new Path(tablePath, "const/TX/const2/2-austin")); + fs.mkdirs(new Path(tablePath, "const/NY/const2/1-newyork")); + fs.createNewFile(new Path(tablePath, "const/CO/const2/1-denver/datafile")); + fs.createNewFile(new Path(tablePath, "const/CA/const2/1-sanjose/datafile")); + fs.createNewFile(new Path(tablePath, "const/CA/const2/2-sanfrancisco/datafile")); + fs.createNewFile(new Path(tablePath, "const/IL/const2/1-chicago/datafile")); + fs.createNewFile(new Path(tablePath, "const/TX/const2/1-dallas/datafile")); + fs.createNewFile(new Path(tablePath, "const/TX/const2/2-austin/datafile")); + fs.createNewFile(new Path(tablePath, "const/NY/const2/1-newyork/datafile")); + CheckResult result = customChecker.checkMetastore(null, dbName, externalTableName, null, null); + assertEquals(7, result.getPartitionsNotInMs().size()); + //cleanup + hive.dropTable(dbName, externalTableName); + fs.delete(tablePath, true); + } + @Test public void testAddPartitionMMBase() throws Exception { Table table = createTestTable(true); @@ -812,6 +858,23 @@ private Table createTestTable(boolean transactional) throws HiveException, Alrea } return table; } + + private Table createTestExternalTable() throws AlreadyExistsException, HiveException { + Database db = new Database(); + db.setName(dbName); + hive.createDatabase(db, true); + + Table table = new Table(dbName, externalTableName); + table.setDbName(dbName); + table.setInputFormatClass(TextInputFormat.class); + table.setOutputFormatClass(HiveIgnoreKeyTextOutputFormat.class); + table.setPartCols(partColsExt); + + hive.createTable(table); + table = hive.getTable(dbName, externalTableName); + table.setTableType(TableType.EXTERNAL_TABLE); + return table; + } private Table createNonPartitionedTable() throws Exception { Database db = new Database(); db.setName(dbName); diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreChecker.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreChecker.java index 11dc51b827ad..a2dfcf141647 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreChecker.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreChecker.java @@ -30,6 +30,7 @@ import static org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils.getPartitionColtoTypeMap; import static org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils.getPartitionsByProjectSpec; import static org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils.getPath; +import static org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils.HCAT_CUSTOM_DYNAMIC_PATTERN; import static org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils.isPartitioned; import java.io.IOException; @@ -50,6 +51,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.ThreadFactory; +import java.util.regex.Pattern; import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.conf.Configuration; @@ -61,6 +63,7 @@ import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.GetPartitionsRequest; import org.apache.hadoop.hive.metastore.api.GetProjectionsSpec; +import org.apache.hadoop.hive.metastore.utils.DynamicPartitioningCustomPattern; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.MetastoreException; import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; @@ -574,14 +577,26 @@ private final class PathDepthInfoCallable implements Callable { private final ConcurrentLinkedQueue pendingPaths; private final boolean throwException; private final PathDepthInfo pd; + private final DynamicPartitioningCustomPattern compiledCustomPattern; + private final int maxDepth; private PathDepthInfoCallable(PathDepthInfo pd, List partColNames, FileSystem fs, - ConcurrentLinkedQueue basePaths) { + ConcurrentLinkedQueue basePaths, String customPattern) { this.partColNames = partColNames; this.pd = pd; this.fs = fs; this.pendingPaths = basePaths; this.throwException = "throw".equals(MetastoreConf.getVar(conf, MetastoreConf.ConfVars.MSCK_PATH_VALIDATION)); + if (customPattern != null) { + String[] parts = customPattern.split(Path.SEPARATOR); + this.compiledCustomPattern = new DynamicPartitioningCustomPattern.Builder() + .setCustomPattern(customPattern) + .build(); + this.maxDepth = parts.length; + } else { + this.compiledCustomPattern = null; + this.maxDepth = partColNames.size(); + } } @Override @@ -593,7 +608,17 @@ private Path processPathDepthInfo(final PathDepthInfo pd) throws IOException, MetastoreException { final Path currentPath = pd.p; final int currentDepth = pd.depth; - if (currentDepth == partColNames.size()) { + if (currentDepth == maxDepth) { + if (compiledCustomPattern != null) { //validate path against pattern + Pattern fullPattern = compiledCustomPattern.getPartitionCapturePattern(); + String[] parts = currentPath.toString().split(Path.SEPARATOR); + String relPath = String.join(Path.SEPARATOR, + Arrays.copyOfRange(parts, parts.length - maxDepth, parts.length)); + if (!fullPattern.matcher(relPath).matches()) { + logOrThrowExceptionWithMsg("File path " + currentPath + " does not match custom partitioning pattern" + + compiledCustomPattern.getCustomPattern()); + } + } return currentPath; } List fileStatuses = new ArrayList<>(); @@ -605,7 +630,7 @@ private Path processPathDepthInfo(final PathDepthInfo pd) } // found no files under a sub-directory under table base path; it is possible that the table // is empty and hence there are no partition sub-directories created under base path - if (fileStatuses.size() == 0 && currentDepth > 0) { + if (fileStatuses.isEmpty() && currentDepth > 0 && currentDepth < maxDepth) { // since maxDepth is not yet reached, we are missing partition // columns in currentPath logOrThrowExceptionWithMsg( @@ -618,6 +643,12 @@ private Path processPathDepthInfo(final PathDepthInfo pd) logOrThrowExceptionWithMsg( "MSCK finds a file rather than a directory when it searches for " + fileStatus.getPath().toString()); + } else if (fileStatus.isDirectory() && compiledCustomPattern != null) { + //since this is a restricted custom pattern, it could be almost anything + //when we hit max depth we can perform a validation check of entire path against pattern, + //to avoid creating regexen at every depth + Path nextPath = fileStatus.getPath(); + pendingPaths.add(new PathDepthInfo(nextPath, currentDepth + 1)); } else { // found a sub-directory at a depth less than number of partition keys // validate if the partition directory name matches with the corresponding @@ -677,7 +708,8 @@ void checkPartitionDirs(final ExecutorService executor, //process each level in parallel while(!nextLevel.isEmpty()) { futures.add( - executor.submit(new PathDepthInfoCallable(nextLevel.poll(), partColNames, fs, tempQueue))); + executor.submit(new PathDepthInfoCallable(nextLevel.poll(), partColNames, fs, tempQueue, + conf.get(HCAT_CUSTOM_DYNAMIC_PATTERN)))); } while(!futures.isEmpty()) { Path p = futures.poll().get(); diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/utils/DynamicPartitioningCustomPattern.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/utils/DynamicPartitioningCustomPattern.java new file mode 100644 index 000000000000..c777982bf7fb --- /dev/null +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/utils/DynamicPartitioningCustomPattern.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.metastore.utils; + +import java.util.ArrayList; +import java.util.List; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * When using msck repair table with custom partitioning patterns, we need to capture + * the partition keys from the pattern, and use those to construct a regex which will + * match the paths and extract the partition key values.*/ +public final class DynamicPartitioningCustomPattern { + + private final String customPattern; + private final Pattern partitionCapturePattern; + private final List partitionColumns; + + // regex of the form: ${column name}. Following characters are not allowed in column name: + // whitespace characters, /, {, }, \ + public static final Pattern customPathPattern = Pattern.compile("(\\$\\{)([^\\s/\\{\\}\\\\]+)(\\})"); + + private DynamicPartitioningCustomPattern(String customPattern, Pattern partitionCapturePattern, + List partitionColumns) { + this.customPattern = customPattern; + this.partitionCapturePattern = partitionCapturePattern; + this.partitionColumns = partitionColumns; + } + + /** + * @return stored custom pattern string + * */ + public String getCustomPattern() { + return customPattern; + } + + /** + * @return stored custom pattern regex matcher + * */ + public Pattern getPartitionCapturePattern() { + return partitionCapturePattern; + } + + /** + * @return list of partition key columns + * */ + public List getPartitionColumns() { + return partitionColumns; + } + + public static class Builder { + private String customPattern; + + public Builder setCustomPattern(String customPattern) { + this.customPattern = customPattern; + return this; + } + + /** + * Constructs the regex to match the partition values in a path based on the custom pattern. + * + * @return custom partition pattern matcher */ + public DynamicPartitioningCustomPattern build() { + StringBuffer sb = new StringBuffer(); + Matcher m = customPathPattern.matcher(customPattern); + List partColumns = new ArrayList<>(); + while (m.find()) { + m.appendReplacement(sb, "(.*)"); + partColumns.add(m.group(2)); + } + m.appendTail(sb); + return new DynamicPartitioningCustomPattern(customPattern, Pattern.compile(sb.toString()), partColumns); + } + } +} diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreServerUtils.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreServerUtils.java index 1336f61ece4b..5e32beac8f0f 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreServerUtils.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreServerUtils.java @@ -56,6 +56,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Collectors; import javax.annotation.Nullable; @@ -136,6 +137,8 @@ public String apply(@Nullable String string) { private static final String DELEGATION_TOKEN_STORE_CLS = "hive.cluster.delegation.token.store.class"; + public static final String HCAT_CUSTOM_DYNAMIC_PATTERN = "hcat.dynamic.partitioning.custom.pattern"; + private static final char DOT = '.'; /** @@ -1610,41 +1613,65 @@ public static Partition getPartition(IMetaStoreClient msc, Table tbl, Map partCols, - Map partitionColToTypeMap, Configuration conf) { + Map partitionColToTypeMap, Configuration conf) + throws MetastoreException { StringBuilder result = null; + String customPattern = conf.get(HCAT_CUSTOM_DYNAMIC_PATTERN); Path currPath = partitionPath; LOG.debug("tablePath:" + tablePath + ", partCols: " + partCols); - - while (currPath != null && !tablePath.equals(currPath)) { - // format: partition=p_val - // Add only when table partition colName matches - String[] parts = currPath.getName().split("="); - if (parts.length > 0) { - if (parts.length != 2) { - LOG.warn(currPath.getName() + " is not a valid partition name"); - return result.toString(); - } - - // Since hive stores partitions keys in lower case, if the hdfs path contains mixed case, - // it should be converted to lower case - String partitionName = parts[0].toLowerCase(); - // Do not convert the partitionValue to lowercase - String partitionValue = parts[1]; - if (partCols.contains(partitionName)) { - String normalisedPartitionValue = getNormalisedPartitionValue(partitionValue, - partitionColToTypeMap.get(partitionName), conf); - if (normalisedPartitionValue == null) { - return null; + if (customPattern != null) { + DynamicPartitioningCustomPattern compiledCustomPattern = new DynamicPartitioningCustomPattern.Builder() + .setCustomPattern(customPattern) + .build(); + Pattern customPathPattern = compiledCustomPattern.getPartitionCapturePattern(); + //partition columns in order that they appear in the pattern + List patternPartCols = compiledCustomPattern.getPartitionColumns(); + //relative path starts after tablepath and the / afterwards + String relPath = partitionPath.toString().substring(tablePath.toString().length() + 1); + Matcher pathMatcher = customPathPattern.matcher(relPath); + boolean didMatch = pathMatcher.matches(); + if (!didMatch) { //partition path doesn't match the pattern, should have been detected at an earlier step + throw new MetastoreException("Path " + relPath + "doesn't match custom partition pattern " + + customPathPattern + "partitionPathFull: " + partitionPath); + } + if (!patternPartCols.isEmpty()) { + result = new StringBuilder(patternPartCols.get(0) + "=" + pathMatcher.group(1)); + } + for (int i = 1; i < patternPartCols.size(); i++) { + result.append(Path.SEPARATOR).append(patternPartCols.get(i)).append("=").append(pathMatcher.group(i+1)); + } + } else { + while (currPath != null && !tablePath.equals(currPath)) { + // format: partition=p_val + // Add only when table partition colName matches + String[] parts = currPath.getName().split("="); + if (parts.length > 0) { + if (parts.length != 2) { + LOG.warn(currPath.getName() + " is not a valid partition name"); + return result.toString(); } - if (result == null) { - result = new StringBuilder(partitionName + "=" + normalisedPartitionValue); - } else { - result.insert(0, partitionName + "=" + normalisedPartitionValue + Path.SEPARATOR); + + // Since hive stores partitions keys in lower case, if the hdfs path contains mixed case, + // it should be converted to lower case + String partitionName = parts[0].toLowerCase(); + // Do not convert the partitionValue to lowercase + String partitionValue = parts[1]; + if (partCols.contains(partitionName)) { + String normalisedPartitionValue = getNormalisedPartitionValue(partitionValue, + partitionColToTypeMap.get(partitionName), conf); + if (normalisedPartitionValue == null) { + return null; + } + if (result == null) { + result = new StringBuilder(partitionName + "=" + normalisedPartitionValue); + } else { + result.insert(0, partitionName + "=" + normalisedPartitionValue + Path.SEPARATOR); + } } } + currPath = currPath.getParent(); + LOG.debug("currPath=" + currPath); } - currPath = currPath.getParent(); - LOG.debug("currPath=" + currPath); } return (result == null) ? null : result.toString(); }