flattens) {
- forEach.setToBeFlattened(flattens);
- }
-
- /**
- * @return the forEach
- */
- public POOptimizedForEach getForEach() {
- return forEach;
- }
-
- /**
- * @param chunkSize - the chunk size for the biggest input
- */
- public void setChunkSize(long chunkSize) {
- this.chunkSize = chunkSize;
- }
-}
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMultiQueryPackage.java b/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMultiQueryPackage.java
deleted file mode 100644
index d6041741a3..0000000000
--- a/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMultiQueryPackage.java
+++ /dev/null
@@ -1,310 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.pig.PigException;
-import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
-import org.apache.pig.data.DataType;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.impl.io.NullableTuple;
-import org.apache.pig.impl.io.NullableUnknownWritable;
-import org.apache.pig.impl.io.PigNullableWritable;
-import org.apache.pig.impl.plan.OperatorKey;
-import org.apache.pig.impl.plan.VisitorException;
-import org.apache.pig.backend.hadoop.HDataType;
-
-/**
- * The package operator that packages the globally rearranged tuples
- * into output format as required by multi-query de-multiplexer.
- *
- * This operator is used when merging multiple Map-Reduce splittees
- * into a Map-only splitter during multi-query optimization.
- * The package operators of the reduce plans of the splittees form an
- * indexed package list inside this operator. When this operator
- * receives an input, it extracts the index from the key and calls the
- * corresponding package to get the output data.
- *
- * Due to the recursive nature of multi-query optimization, this operator
- * may be contained in another multi-query packager.
- *
- * The successor of this operator must be a PODemux operator which
- * knows how to consume the output of this operator.
- */
-public class POMultiQueryPackage extends POPackage {
-
- private static final long serialVersionUID = 1L;
-
- private static int idxPart = 0x7F;
-
- private List packages = new ArrayList();
-
- /**
- * If the POLocalRearranges corresponding to the reduce plans in
- * myPlans (the list of inner plans of the demux) have different key types
- * then the MultiQueryOptimizer converts all the keys to be of type tuple
- * by wrapping any non-tuple keys into Tuples (keys which are already tuples
- * are left alone).
- * The list below is a list of booleans indicating whether extra tuple wrapping
- * was done for the key in the corresponding POLocalRearranges and if we need
- * to "unwrap" the tuple to get to the key
- */
- private ArrayList isKeyWrapped = new ArrayList();
-
- /*
- * Indicating if all the inner plans have the same
- * map key type. If not, the keys passed in are
- * wrapped inside tuples and need to be extracted
- * out during the reduce phase
- */
- private boolean sameMapKeyType = true;
-
- /*
- * Indicating if this operator is in a combiner.
- * If not, this operator is in a reducer and the key
- * values must first be extracted from the tuple-wrap
- * before writing out to the disk
- */
- private boolean inCombiner = false;
-
- transient private PigNullableWritable myKey;
-
- /**
- * Constructs an operator with the specified key.
- *
- * @param k the operator key
- */
- public POMultiQueryPackage(OperatorKey k) {
- this(k, -1, null);
- }
-
- /**
- * Constructs an operator with the specified key
- * and degree of parallelism.
- *
- * @param k the operator key
- * @param rp the degree of parallelism requested
- */
- public POMultiQueryPackage(OperatorKey k, int rp) {
- this(k, rp, null);
- }
-
- /**
- * Constructs an operator with the specified key and inputs.
- *
- * @param k the operator key
- * @param inp the inputs that this operator will read data from
- */
- public POMultiQueryPackage(OperatorKey k, List inp) {
- this(k, -1, inp);
- }
-
- /**
- * Constructs an operator with the specified key,
- * degree of parallelism and inputs.
- *
- * @param k the operator key
- * @param rp the degree of parallelism requested
- * @param inp the inputs that this operator will read data from
- */
- public POMultiQueryPackage(OperatorKey k, int rp, List inp) {
- super(k, rp, inp);
- }
-
- @Override
- public String name() {
- return "MultiQuery Package [" + isKeyWrapped + "] - " + getOperatorKey().toString();
- }
-
- @Override
- public boolean supportsMultipleInputs() {
- return false;
- }
-
- @Override
- public void visit(PhyPlanVisitor v) throws VisitorException {
- v.visitMultiQueryPackage(this);
- }
-
- @Override
- public boolean supportsMultipleOutputs() {
- return false;
- }
-
- @Override
- public void attachInput(PigNullableWritable k, Iterator inp) {
- tupIter = inp;
- myKey = k;
- }
-
- @Override
- public void detachInput() {
- tupIter = null;
- myKey = null;
- }
-
- /**
- * Appends the specified package object to the end of
- * the package list.
- *
- * @param pack package to be appended to the list
- */
- public void addPackage(POPackage pack) {
- packages.add(pack);
- }
-
- /**
- * Appends the specified package object to the end of
- * the package list.
- *
- * @param pack package to be appended to the list
- * @param mapKeyType the map key type associated with the package
- */
- public void addPackage(POPackage pack, byte mapKeyType) {
- packages.add(pack);
- // if mapKeyType is already a tuple, we will NOT
- // be wrapping it in an extra tuple. If it is not
- // a tuple, we will wrap into in a tuple
- isKeyWrapped.add(mapKeyType == DataType.TUPLE ? false : true);
- }
-
- /**
- * Returns the list of packages.
- *
- * @return the list of the packages
- */
- public List getPackages() {
- return packages;
- }
-
- /**
- * Constructs the output tuple from the inputs.
- *
- * The output is consumed by for the demultiplexer operator
- * (PODemux) in the format (key, {bag of tuples}) where key
- * is an indexed WritableComparable, not the wrapped value as a pig type.
- */
- @Override
- public Result getNextTuple() throws ExecException {
-
- byte origIndex = myKey.getIndex();
-
- int index = (int)origIndex;
- index &= idxPart;
-
- if (index >= packages.size() || index < 0) {
- int errCode = 2140;
- String msg = "Invalid package index " + index
- + " should be in the range between 0 and " + packages.size();
- throw new ExecException(msg, errCode, PigException.BUG);
- }
-
- POPackage pack = packages.get(index);
-
- // check to see if we need to unwrap the key. The keys may be
- // wrapped inside a tuple by LocalRearrange operator when jobs
- // with different map key types are merged
- PigNullableWritable curKey = myKey;
- if (!sameMapKeyType && !inCombiner && isKeyWrapped.get(index)) {
- Tuple tup = (Tuple)myKey.getValueAsPigType();
- curKey = HDataType.getWritableComparableTypes(tup.get(0), pack.getKeyType());
- curKey.setIndex(origIndex);
- }
-
- pack.attachInput(curKey, tupIter);
-
- Result res = pack.getNextTuple();
- pack.detachInput();
-
- Tuple tuple = (Tuple)res.result;
-
- // the object present in the first field
- // of the tuple above is the real data without
- // index information - this is because the
- // package above, extracts the real data out of
- // the PigNullableWritable object - we are going to
- // give this result tuple to a PODemux operator
- // which needs a PigNullableWritable first field so
- // it can figure out the index. Therefore we need
- // to add index to the first field of the tuple.
-
- Object obj = tuple.get(0);
- if (obj instanceof PigNullableWritable) {
- ((PigNullableWritable)obj).setIndex(origIndex);
- }
- else {
- PigNullableWritable myObj = null;
- if (obj == null) {
- myObj = new NullableUnknownWritable();
- myObj.setNull(true);
- }
- else {
- myObj = HDataType.getWritableComparableTypes(obj, HDataType.findTypeFromNullableWritable(curKey));
- }
- myObj.setIndex(origIndex);
- tuple.set(0, myObj);
- }
- // illustrator markup has been handled by "pack"
- return res;
- }
-
- /**
- * Returns the list of booleans that indicates if the
- * key needs to unwrapped for the corresponding plan.
- *
- * @return the list of isKeyWrapped boolean values
- */
- public List getIsKeyWrappedList() {
- return Collections.unmodifiableList(isKeyWrapped);
- }
-
- /**
- * Adds a list of IsKeyWrapped boolean values
- *
- * @param lst the list of boolean values to add
- */
- public void addIsKeyWrappedList(List lst) {
- for (Boolean b : lst) {
- isKeyWrapped.add(b);
- }
- }
-
- public void setInCombiner(boolean inCombiner) {
- this.inCombiner = inCombiner;
- }
-
- public boolean isInCombiner() {
- return inCombiner;
- }
-
- public void setSameMapKeyType(boolean sameMapKeyType) {
- this.sameMapKeyType = sameMapKeyType;
- }
-
- public boolean isSameMapKeyType() {
- return sameMapKeyType;
- }
-
-}
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackageLite.java b/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackageLite.java
deleted file mode 100644
index c200715e4a..0000000000
--- a/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackageLite.java
+++ /dev/null
@@ -1,235 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- *
- */
-package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
-
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Map.Entry;
-
-import org.apache.pig.impl.util.IdentityHashSet;
-import org.apache.pig.impl.util.Pair;
-
-import org.apache.pig.PigException;
-import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
-import org.apache.pig.data.DataBag;
-import org.apache.pig.data.DataType;
-import org.apache.pig.data.ReadOnceBag;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.impl.io.NullableTuple;
-import org.apache.pig.impl.plan.NodeIdGenerator;
-import org.apache.pig.impl.plan.OperatorKey;
-import org.apache.pig.impl.plan.VisitorException;
-import org.apache.pig.pen.util.ExampleTuple;
-import org.apache.pig.pen.util.LineageTracer;
-
-/**
- * This package operator is a specialization
- * of POPackage operator used for the specific
- * case of the order by query. See JIRA 802
- * for more details.
- */
-public class POPackageLite extends POPackage {
-
- /**
- *
- */
- private static final long serialVersionUID = 1L;
-
- public POPackageLite(OperatorKey k) {
- super(k, -1, null);
- }
-
- public POPackageLite(OperatorKey k, int rp) {
- super(k, rp, null);
- }
-
- public POPackageLite(OperatorKey k, List inp) {
- super(k, -1, inp);
- }
-
- public POPackageLite(OperatorKey k, int rp, List inp) {
- super(k, rp, inp);
- }
-
- /* (non-Javadoc)
- * @see org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage#setNumInps(int)
- */
- @Override
- public void setNumInps(int numInps) {
- if(numInps != 1)
- {
- throw new RuntimeException("POPackageLite can only take 1 input");
- }
- this.numInputs = numInps;
- }
-
- public boolean[] getInner() {
- throw new RuntimeException("POPackageLite does not support getInner operation");
- }
-
- public void setInner(boolean[] inner) {
- throw new RuntimeException("POPackageLite does not support setInner operation");
- }
-
- /**
- * Make a deep copy of this operator.
- * @throws CloneNotSupportedException
- */
- @Override
- public POPackageLite clone() throws CloneNotSupportedException {
- POPackageLite clone = (POPackageLite)super.clone();
- clone.inner = null;
- clone.keyInfo = new HashMap>>();
- for (Entry>> entry: keyInfo.entrySet()) {
- clone.keyInfo.put(entry.getKey(), entry.getValue());
- }
- return clone;
- }
-
- /**
- * @return the distinct
- */
- @Override
- public boolean isDistinct() {
- throw new RuntimeException("POPackageLite does not support isDistinct operation");
- }
-
- /**
- * @param distinct the distinct to set
- */
- @Override
- public void setDistinct(boolean distinct) {
- throw new RuntimeException("POPackageLite does not support setDistinct operation");
- }
-
- /**
- * @return the isKeyTuple
- */
- public boolean getKeyTuple() {
- return isKeyTuple;
- }
-
- /**
- * @return the keyAsTuple
- */
- public Tuple getKeyAsTuple() {
- return keyAsTuple;
- }
-
- /**
- * @return the tupIter
- */
- public Iterator getTupIter() {
- return tupIter;
- }
-
- /**
- * @return the key
- */
- public Object getKey() {
- return key;
- }
-
- /**
- * Similar to POPackage.getNext except that
- * only one input is expected with index 0
- * and ReadOnceBag is used instead of
- * DefaultDataBag.
- */
- @Override
- public Result getNextTuple() throws ExecException {
- Tuple res;
- //Create numInputs bags
- ReadOnceBag db = null;
- db = new ReadOnceBag(this, tupIter, key);
- if(getReporter()!=null) {
- getReporter().progress();
- }
-
- //Construct the output tuple by appending
- //the key and all the above constructed bags
- //and return it.
- res = mTupleFactory.newTuple(numInputs+1);
- res.set(0,key);
- res.set(1,db);
- detachInput();
- Result r = new Result();
- r.returnStatus = POStatus.STATUS_OK;
- r.result = illustratorMarkup(null, res, 0);
- return r;
- }
-
- /**
- * Makes use of the superclass method, but this requires
- * an additional parameter key passed by ReadOnceBag.
- * key of this instance will be set to null in detachInput
- * call, but an instance of ReadOnceBag may have the original
- * key that it uses. Therefore this extra argument is taken
- * to temporarily set it before the call to the superclass method
- * and then restore it.
- */
- public Tuple getValueTuple(NullableTuple ntup, int index, Object key) throws ExecException {
- Object origKey = this.key;
- this.key = key;
- Tuple retTuple = super.getValueTuple(ntup, index);
- this.key = origKey;
- return retTuple;
- }
-
- @Override
- public String name() {
- return getAliasString() + "PackageLite" + "["
- + DataType.findTypeName(resultType) + "]" + "{"
- + DataType.findTypeName(keyType) + "}" + " - "
- + mKey.toString();
- }
-
- @Override
- public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
- if(illustrator != null) {
- ExampleTuple tOut = new ExampleTuple((Tuple) out);
- LineageTracer lineageTracer = illustrator.getLineage();
- lineageTracer.insert(tOut);
- if (illustrator.getEquivalenceClasses() == null) {
- LinkedList> equivalenceClasses = new LinkedList>();
- for (int i = 0; i < numInputs; ++i) {
- IdentityHashSet equivalenceClass = new IdentityHashSet();
- equivalenceClasses.add(equivalenceClass);
- }
- illustrator.setEquivalenceClasses(equivalenceClasses, this);
- }
- illustrator.getEquivalenceClasses().get(eqClassIndex).add(tOut);
- tOut.synthetic = false; // not expect this to be really used
- illustrator.addData((Tuple) tOut);
- return tOut;
- } else
- return (Tuple) out;
- }
-}
-
diff --git a/src/org/apache/pig/builtin/ParquetLoader.java b/src/org/apache/pig/builtin/ParquetLoader.java
index 76516e3b1c..1a245de688 100644
--- a/src/org/apache/pig/builtin/ParquetLoader.java
+++ b/src/org/apache/pig/builtin/ParquetLoader.java
@@ -27,7 +27,7 @@
import org.apache.pig.impl.util.JarManager;
/**
- * Wrapper class which will delegate calls to parquet.pig.ParquetLoader
+ * Wrapper class which will delegate calls to org.apache.parquet.pig.ParquetLoader
*/
public class ParquetLoader extends LoadFuncMetadataWrapper implements LoadPushDown {
@@ -37,12 +37,12 @@ public ParquetLoader() throws FrontendException {
public ParquetLoader(String requestedSchemaStr) throws FrontendException {
try {
- init(new parquet.pig.ParquetLoader(requestedSchemaStr));
+ init(new org.apache.parquet.pig.ParquetLoader(requestedSchemaStr));
}
// if compile time dependency not found at runtime
catch (NoClassDefFoundError e) {
throw new FrontendException(String.format("Cannot instantiate class %s (%s)",
- getClass().getName(), "parquet.pig.ParquetLoader"), 2259, e);
+ getClass().getName(), "org.apache.parquet.pig.ParquetLoader"), 2259, e);
}
}
@@ -52,7 +52,7 @@ private void init(LoadMetadata loadMetadata) {
@Override
public void setLocation(String location, Job job) throws IOException {
- JarManager.addDependencyJars(job, parquet.Version.class);
+ JarManager.addDependencyJars(job, org.apache.parquet.Version.class);
super.setLocation(location, job);
}
diff --git a/src/org/apache/pig/builtin/ParquetStorer.java b/src/org/apache/pig/builtin/ParquetStorer.java
index 2052236eae..755fe16842 100644
--- a/src/org/apache/pig/builtin/ParquetStorer.java
+++ b/src/org/apache/pig/builtin/ParquetStorer.java
@@ -25,18 +25,18 @@
import org.apache.pig.impl.util.JarManager;
/**
- * Wrapper class which will delegate calls to parquet.pig.ParquetStorer
+ * Wrapper class which will delegate calls to org.apache.parquet.pig.ParquetStorer
*/
public class ParquetStorer extends StoreFuncMetadataWrapper {
public ParquetStorer() throws FrontendException {
try {
- init(new parquet.pig.ParquetStorer());
+ init(new org.apache.parquet.pig.ParquetStorer());
}
// if compile time dependency not found at runtime
catch (NoClassDefFoundError e) {
throw new FrontendException(String.format("Cannot instantiate class %s (%s)",
- getClass().getName(), "parquet.pig.ParquetStorer"), 2259, e);
+ getClass().getName(), "org.apache.parquet.pig.ParquetStorer"), 2259, e);
}
}
@@ -49,7 +49,7 @@ private void init(StoreMetadata storeMetadata) {
*/
@Override
public void setStoreLocation(String location, Job job) throws IOException {
- JarManager.addDependencyJars(job, parquet.Version.class);
+ JarManager.addDependencyJars(job, org.apache.parquet.Version.class);
super.setStoreLocation(location, job);
}