From 16037a41fde7818882bafd6536625b88a436ea53 Mon Sep 17 00:00:00 2001 From: krishnapriawan Date: Thu, 7 Nov 2024 15:21:05 +0900 Subject: [PATCH 1/5] Parquet-pig-bundle 1.9.0 --- build.xml | 4 ++-- ivy.xml | 2 +- ivy/libraries.properties | 4 ++-- src/org/apache/pig/builtin/ParquetLoader.java | 8 ++++---- src/org/apache/pig/builtin/ParquetStorer.java | 8 ++++---- 5 files changed, 13 insertions(+), 13 deletions(-) diff --git a/build.xml b/build.xml index 84e4b5e125..97880bd76e 100644 --- a/build.xml +++ b/build.xml @@ -69,7 +69,7 @@ - + @@ -247,7 +247,7 @@ - + diff --git a/ivy.xml b/ivy.xml index 3f2c94373b..b969ad6092 100644 --- a/ivy.xml +++ b/ivy.xml @@ -405,7 +405,7 @@ - + diff --git a/ivy/libraries.properties b/ivy/libraries.properties index a0eb00acd2..44d7bb18a1 100644 --- a/ivy/libraries.properties +++ b/ivy/libraries.properties @@ -38,7 +38,7 @@ checkstyle.version=4.2 ivy.version=2.2.0 jasper.version=6.1.14 groovy.version=2.4.5 -guava.version=11.0 +guava.version=20.0 hadoop-common.version=2.7.3 hadoop-hdfs.version=2.7.3 hadoop-mapreduce.version=2.7.3 @@ -91,7 +91,7 @@ jansi.version=1.9 asm.version=3.3.1 snappy-java.version=1.1.1.3 tez.version=0.7.0 -parquet-pig-bundle.version=1.2.3 +parquet-pig-bundle.version=1.9.0 snappy.version=0.2 leveldbjni.version=1.8 curator.version=2.6.0 diff --git a/src/org/apache/pig/builtin/ParquetLoader.java b/src/org/apache/pig/builtin/ParquetLoader.java index 76516e3b1c..1a245de688 100644 --- a/src/org/apache/pig/builtin/ParquetLoader.java +++ b/src/org/apache/pig/builtin/ParquetLoader.java @@ -27,7 +27,7 @@ import org.apache.pig.impl.util.JarManager; /** - * Wrapper class which will delegate calls to parquet.pig.ParquetLoader + * Wrapper class which will delegate calls to org.apache.parquet.pig.ParquetLoader */ public class ParquetLoader extends LoadFuncMetadataWrapper implements LoadPushDown { @@ -37,12 +37,12 @@ public ParquetLoader() throws FrontendException { public ParquetLoader(String requestedSchemaStr) throws FrontendException { try { - init(new parquet.pig.ParquetLoader(requestedSchemaStr)); + init(new org.apache.parquet.pig.ParquetLoader(requestedSchemaStr)); } // if compile time dependency not found at runtime catch (NoClassDefFoundError e) { throw new FrontendException(String.format("Cannot instantiate class %s (%s)", - getClass().getName(), "parquet.pig.ParquetLoader"), 2259, e); + getClass().getName(), "org.apache.parquet.pig.ParquetLoader"), 2259, e); } } @@ -52,7 +52,7 @@ private void init(LoadMetadata loadMetadata) { @Override public void setLocation(String location, Job job) throws IOException { - JarManager.addDependencyJars(job, parquet.Version.class); + JarManager.addDependencyJars(job, org.apache.parquet.Version.class); super.setLocation(location, job); } diff --git a/src/org/apache/pig/builtin/ParquetStorer.java b/src/org/apache/pig/builtin/ParquetStorer.java index 2052236eae..755fe16842 100644 --- a/src/org/apache/pig/builtin/ParquetStorer.java +++ b/src/org/apache/pig/builtin/ParquetStorer.java @@ -25,18 +25,18 @@ import org.apache.pig.impl.util.JarManager; /** - * Wrapper class which will delegate calls to parquet.pig.ParquetStorer + * Wrapper class which will delegate calls to org.apache.parquet.pig.ParquetStorer */ public class ParquetStorer extends StoreFuncMetadataWrapper { public ParquetStorer() throws FrontendException { try { - init(new parquet.pig.ParquetStorer()); + init(new org.apache.parquet.pig.ParquetStorer()); } // if compile time dependency not found at runtime catch (NoClassDefFoundError e) { throw new FrontendException(String.format("Cannot instantiate class %s (%s)", - getClass().getName(), "parquet.pig.ParquetStorer"), 2259, e); + getClass().getName(), "org.apache.parquet.pig.ParquetStorer"), 2259, e); } } @@ -49,7 +49,7 @@ private void init(StoreMetadata storeMetadata) { */ @Override public void setStoreLocation(String location, Job job) throws IOException { - JarManager.addDependencyJars(job, parquet.Version.class); + JarManager.addDependencyJars(job, org.apache.parquet.Version.class); super.setStoreLocation(location, job); } From 11010e739bd0b72ad1c961cf744f4df2e99c3a1f Mon Sep 17 00:00:00 2001 From: krishnapriawan Date: Thu, 7 Nov 2024 16:37:42 +0900 Subject: [PATCH 2/5] Updated versioning --- build.xml | 8 ++++---- ivy/libraries.properties | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/build.xml b/build.xml index 97880bd76e..32581e4896 100644 --- a/build.xml +++ b/build.xml @@ -31,15 +31,15 @@ - + - - + + @@ -84,7 +84,7 @@ - + diff --git a/ivy/libraries.properties b/ivy/libraries.properties index 44d7bb18a1..787367a710 100644 --- a/ivy/libraries.properties +++ b/ivy/libraries.properties @@ -38,7 +38,7 @@ checkstyle.version=4.2 ivy.version=2.2.0 jasper.version=6.1.14 groovy.version=2.4.5 -guava.version=20.0 +guava.version=23.0 hadoop-common.version=2.7.3 hadoop-hdfs.version=2.7.3 hadoop-mapreduce.version=2.7.3 From ba7b3ad80d0bd37cd6fc99b0f7682dcceb01b043 Mon Sep 17 00:00:00 2001 From: Sonia Koza Date: Tue, 20 Jan 2026 11:45:19 +0100 Subject: [PATCH 3/5] implement Github Actions --- .github/workflows/ci.yml | 141 +++++++++++++++++++++++++++++++++++++++ .gitignore | 7 ++ 2 files changed, 148 insertions(+) create mode 100644 .github/workflows/ci.yml diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 0000000000..ead4bb1644 --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,141 @@ +name: CI + +on: + push: + branches: ['**'] + +jobs: + build: + name: Build + runs-on: ubuntu-latest + timeout-minutes: 30 + + steps: + - name: Checkout code + uses: actions/checkout@v4 + + - name: Setup Java 8 (OpenJDK 8) + uses: actions/setup-java@v4 + with: + distribution: 'temurin' + java-version: '8' + + - name: Verify Java installation + run: | + export JRE_HOME=${JAVA_HOME} + echo "JAVA_HOME: $JAVA_HOME" + echo "JRE_HOME: $JRE_HOME" + java -version + + - name: Download ant-contrib + run: | + mkdir -p ivy + curl -L -o ivy/ant-contrib-1.0b3.jar \ + https://repo1.maven.org/maven2/ant-contrib/ant-contrib/1.0b3/ant-contrib-1.0b3.jar + + - name: Build with Ant + run: | + export JRE_HOME=${JAVA_HOME} + ant jar -Dhadoopversion=2 -Dhbaseversion=1 + + - name: Verify JAR was built + run: | + ls -la build/*.jar + ls -la *.jar || true + + - name: Upload JAR as artifact + uses: actions/upload-artifact@v4 + with: + name: pig-jar + path: | + build/pig-*.jar + pig-*.jar + retention-days: 30 + + deploy: + name: Deploy to JFrog + runs-on: ubuntu-latest + needs: build + if: github.ref == 'refs/heads/staging' || github.ref == 'refs/heads/master' + + steps: + - name: Checkout code + uses: actions/checkout@v4 + + - name: Setup Java 8 (OpenJDK 8) + uses: actions/setup-java@v4 + with: + distribution: 'temurin' + java-version: '8' + + - name: Extract version from build.xml + id: version + run: | + # Extract pig.version and pig.version.suffix from build.xml + PIG_VERSION=$(grep -oP 'name="pig.version" value="\K[^"]+' build.xml) + PIG_SUFFIX=$(grep -oP 'name="pig.version.suffix" value="\K[^"]+' build.xml || echo "") + FULL_VERSION="${PIG_VERSION}${PIG_SUFFIX}" + echo "PIG_VERSION=${FULL_VERSION}" >> $GITHUB_ENV + echo "Detected version: ${FULL_VERSION}" + + - name: Download ant-contrib + run: | + mkdir -p ivy + curl -L -o ivy/ant-contrib-1.0b3.jar \ + https://repo1.maven.org/maven2/ant-contrib/ant-contrib/1.0b3/ant-contrib-1.0b3.jar + + - name: Build with Ant + run: | + export JRE_HOME=${JAVA_HOME} + ant jar -Dhadoopversion=2 -Dhbaseversion=1 + + - name: Setup JFrog CLI + uses: jfrog/setup-jfrog-cli@v4 + env: + JF_URL: https://xplenty.jfrog.io + JF_USER: ${{ secrets.JFROG_USERNAME }} + JF_PASSWORD: ${{ secrets.JFROG_PASSWORD }} + + - name: Configure JFrog CLI + run: | + jf config add xplenty-artifactory \ + --url=https://xplenty.jfrog.io/artifactory \ + --user=${{ secrets.JFROG_USERNAME }} \ + --password=${{ secrets.JFROG_PASSWORD }} \ + --interactive=false + + - name: Deploy to staging + if: github.ref == 'refs/heads/staging' + run: | + # Upload the main pig JAR + jf rt upload \ + "build/pig-*.jar" \ + "sharelib-stg/org/apache/pig/pig/${PIG_VERSION}/" \ + --server-id=xplenty-artifactory \ + --flat=false + + # Also upload the root-level JAR if it exists + if [ -f "pig-${PIG_VERSION}.jar" ]; then + jf rt upload \ + "pig-${PIG_VERSION}.jar" \ + "sharelib-stg/org/apache/pig/pig/${PIG_VERSION}/" \ + --server-id=xplenty-artifactory + fi + + - name: Deploy to production + if: github.ref == 'refs/heads/master' + run: | + # Upload the main pig JAR + jf rt upload \ + "build/pig-*.jar" \ + "sharelib/org/apache/pig/pig/${PIG_VERSION}/" \ + --server-id=xplenty-artifactory \ + --flat=false + + # Also upload the root-level JAR if it exists + if [ -f "pig-${PIG_VERSION}.jar" ]; then + jf rt upload \ + "pig-${PIG_VERSION}.jar" \ + "sharelib/org/apache/pig/pig/${PIG_VERSION}/" \ + --server-id=xplenty-artifactory + fi diff --git a/.gitignore b/.gitignore index a5504273cb..f26747984a 100644 --- a/.gitignore +++ b/.gitignore @@ -20,3 +20,10 @@ pig-withouthadoop.jar *.crc # Mac finder artifacts .DS_Store + + +# Build artifacts +lib/ +legacy/ +pig-*.jar +${output.jarfile.withouthadoop-h2} \ No newline at end of file From e285e079573d670dd4220affe3af829b6b1b3bdd Mon Sep 17 00:00:00 2001 From: Sonia Koza Date: Tue, 20 Jan 2026 11:55:33 +0100 Subject: [PATCH 4/5] remove unused files --- .../POCombinerPackage.java | 179 ---------- .../relationalOperators/POJoinPackage.java | 305 ----------------- .../POMultiQueryPackage.java | 310 ------------------ .../relationalOperators/POPackageLite.java | 235 ------------- 4 files changed, 1029 deletions(-) delete mode 100644 src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCombinerPackage.java delete mode 100644 src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POJoinPackage.java delete mode 100644 src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMultiQueryPackage.java delete mode 100644 src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackageLite.java diff --git a/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCombinerPackage.java b/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCombinerPackage.java deleted file mode 100644 index 9105a0e195..0000000000 --- a/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCombinerPackage.java +++ /dev/null @@ -1,179 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators; - -import java.util.Arrays; -import java.util.Map; - -import org.apache.pig.backend.executionengine.ExecException; -import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce; -import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus; -import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result; -import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor; -import org.apache.pig.data.BagFactory; -import org.apache.pig.data.DataBag; -import org.apache.pig.data.DataType; -import org.apache.pig.data.InternalCachedBag; -import org.apache.pig.data.NonSpillableDataBag; -import org.apache.pig.data.Tuple; -import org.apache.pig.data.TupleFactory; -import org.apache.pig.impl.io.NullableTuple; -import org.apache.pig.impl.plan.NodeIdGenerator; -import org.apache.pig.impl.plan.OperatorKey; -import org.apache.pig.impl.plan.VisitorException; -import org.apache.pig.impl.util.Pair; -/** - * The package operator that packages the globally rearranged tuples into - * output format after the combiner stage. It differs from POPackage in that - * it does not use the index in the NullableTuple to find the bag to put a - * tuple in. Instead, the inputs are put in a bag corresponding to their - * offset in the tuple. - */ -public class POCombinerPackage extends POPackage { - /** - * - */ - private static final long serialVersionUID = 1L; - - private static BagFactory mBagFactory = BagFactory.getInstance(); - private static TupleFactory mTupleFactory = TupleFactory.getInstance(); - - private boolean[] mBags; // For each field, indicates whether or not it - // needs to be put in a bag. - - private Map keyLookup; - - private int numBags; - - /** - * A new POPostCombinePackage will be constructed as a near clone of the - * provided POPackage. - * @param pkg POPackage to clone. - * @param bags for each field, indicates whether it should be a bag (true) - * or a simple field (false). - */ - public POCombinerPackage(POPackage pkg, boolean[] bags) { - super(new OperatorKey(pkg.getOperatorKey().scope, - NodeIdGenerator.getGenerator().getNextNodeId(pkg.getOperatorKey().scope)), - pkg.getRequestedParallelism(), pkg.getInputs()); - resultType = pkg.getResultType(); - keyType = pkg.keyType; - numInputs = 1; - inner = new boolean[1]; - for (int i = 0; i < pkg.inner.length; i++) { - inner[i] = true; - } - if (bags != null) { - mBags = Arrays.copyOf(bags, bags.length); - } - numBags = 0; - for (int i = 0; i < mBags.length; i++) { - if (mBags[i]) numBags++; - } - } - - @Override - public String name() { - return "POCombinerPackage" + "[" + DataType.findTypeName(resultType) + "]" + "{" + DataType.findTypeName(keyType) + "}" +" - " + mKey.toString(); - } - - @Override - public void visit(PhyPlanVisitor v) throws VisitorException { - v.visitCombinerPackage(this); - } - - /** - * @param keyInfo the keyInfo to set - */ - public void setKeyInfo(Map>> keyInfo) { - this.keyInfo = keyInfo; - // TODO: IMPORTANT ASSUMPTION: Currently we only combine in the - // group case and not in cogroups. So there should only - // be one LocalRearrange from which we get the keyInfo for - // which field in the value is in the key. This LocalRearrange - // has an index of 0. When we do support combiner in Cogroups - // THIS WILL NEED TO BE REVISITED. - Pair> lrKeyInfo = - keyInfo.get(0); // assumption: only group are "combinable", hence index 0 - keyLookup = lrKeyInfo.second; - } - - private DataBag createDataBag(int numBags) { - String bagType = null; - if (PigMapReduce.sJobConfInternal.get() != null) { - bagType = PigMapReduce.sJobConfInternal.get().get("pig.cachedbag.type"); - } - - if (bagType != null && bagType.equalsIgnoreCase("default")) { - return new NonSpillableDataBag(); - } - return new InternalCachedBag(numBags); - } - - @Override - public Result getNextTuple() throws ExecException { - int keyField = -1; - //Create numInputs bags - Object[] fields = new Object[mBags.length]; - for (int i = 0; i < mBags.length; i++) { - if (mBags[i]) fields[i] = createDataBag(numBags); - } - - // For each indexed tup in the inp, split them up and place their - // fields into the proper bags. If the given field isn't a bag, just - // set the value as is. - while (tupIter.hasNext()) { - NullableTuple ntup = tupIter.next(); - Tuple tup = (Tuple)ntup.getValueAsPigType(); - - int tupIndex = 0; // an index for accessing elements from - // the value (tup) that we have currently - for(int i = 0; i < mBags.length; i++) { - Integer keyIndex = keyLookup.get(i); - if(keyIndex == null && mBags[i]) { - // the field for this index is not the - // key - so just take it from the "value" - // we were handed - Currently THIS HAS TO BE A BAG - // In future if this changes, THIS WILL NEED TO BE - // REVISITED. - ((DataBag)fields[i]).add((Tuple)tup.get(tupIndex)); - tupIndex++; - } else { - // the field for this index is in the key - fields[i] = key; - } - } - } - - // The successor of the POCombinerPackage as of - // now SHOULD be a POForeach which has been adjusted - // to look for its inputs by projecting from the corresponding - // positions in the POCombinerPackage output. - // So we will NOT be adding the key in the result here but merely - // putting all bags into a result tuple and returning it. - Tuple res; - res = mTupleFactory.newTuple(mBags.length); - for (int i = 0; i < mBags.length; i++) res.set(i, fields[i]); - Result r = new Result(); - r.result = res; - r.returnStatus = POStatus.STATUS_OK; - return r; - - } - -} diff --git a/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POJoinPackage.java b/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POJoinPackage.java deleted file mode 100644 index 82f11ac6da..0000000000 --- a/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POJoinPackage.java +++ /dev/null @@ -1,305 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators; - -import java.util.List; - -import org.apache.pig.backend.executionengine.ExecException; -import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce; -import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus; -import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result; -import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor; -import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; -import org.apache.pig.data.BagFactory; -import org.apache.pig.data.DataBag; -import org.apache.pig.data.DataType; -import org.apache.pig.data.InternalCachedBag; -import org.apache.pig.data.NonSpillableDataBag; -import org.apache.pig.data.Tuple; -import org.apache.pig.impl.io.NullableTuple; -import org.apache.pig.impl.plan.NodeIdGenerator; -import org.apache.pig.impl.plan.OperatorKey; -import org.apache.pig.impl.plan.VisitorException; - -public class POJoinPackage extends POPackage { - - private static final long serialVersionUID = 1L; - - private POOptimizedForEach forEach; - private boolean newKey = true; - private Tuple res = null; - private boolean lastInputTuple = false; - private static final Tuple t1 = null; - private static final Result eopResult = new Result(POStatus.STATUS_EOP, null); - private boolean firstTime = true; - private boolean useDefaultBag = false; - - public static final String DEFAULT_CHUNK_SIZE = "1000"; - - private long chunkSize = Long.parseLong(DEFAULT_CHUNK_SIZE); - private Result forEachResult; - private DataBag[] dbs = null; - - private int lastBagIndex; - - public POJoinPackage(OperatorKey k, int rp, POPackage p, POForEach f) { - super(k, rp); - String scope = getOperatorKey().getScope(); - NodeIdGenerator nig = NodeIdGenerator.getGenerator(); - forEach = new POOptimizedForEach(new OperatorKey(scope,nig.getNextNodeId(scope))); - if (p!=null) - { - setKeyType(p.getKeyType()); - setNumInps(p.getNumInps()); - lastBagIndex = numInputs - 1; - setInner(p.getInner()); - setKeyInfo(p.getKeyInfo()); - this.isKeyTuple = p.isKeyTuple; - this.isKeyCompound = p.isKeyCompound; - } - if (f!=null) - { - setInputPlans(f.getInputPlans()); - setToBeFlattened(f.getToBeFlattened()); - } - } - - @Override - public void visit(PhyPlanVisitor v) throws VisitorException { - v.visitJoinPackage(this); - } - - @Override - public String name() { - String fString = forEach.getFlatStr(); - return "POJoinPackage" + "(" + fString + ")" + "[" + DataType.findTypeName(resultType) + "]" +" - " + mKey.toString(); - } - - /** - * Calls getNext to get next ForEach result. The input for POJoinPackage is - * a (key, NullableTuple) pair. We will materialize n-1 inputs into bags, feed input#n - * one tuple a time to the delegated ForEach operator, the input for ForEach is - * - * (input#1, input#2, input#3....input#n[i]), i=(1..k), suppose input#n consists - * - * of k tuples. - * For every ForEach input, pull all the results from ForEach. - * getNext will be called multiple times for a particular input, - * it returns one output tuple from ForEach every time we call getNext, - * so we need to maintain internal status to keep tracking of where we are. - */ - @Override - public Result getNextTuple() throws ExecException { - - if(firstTime){ - firstTime = false; - if (PigMapReduce.sJobConfInternal.get() != null) { - String bagType = PigMapReduce.sJobConfInternal.get().get("pig.cachedbag.type"); - if (bagType != null && bagType.equalsIgnoreCase("default")) { - useDefaultBag = true; - } - } - } - // if a previous call to foreach.getNext() - // has still not returned all output, process it - if (forEach.processingPlan) - { - forEachResult = forEach.getNextTuple(); - switch (forEachResult.returnStatus) - { - case POStatus.STATUS_OK: - case POStatus.STATUS_NULL: - case POStatus.STATUS_ERR: - return forEachResult; - case POStatus.STATUS_EOP: - break; - } - } - - NullableTuple it = null; - - // If we see a new NullableTupleIterator, materialize n-1 inputs, construct ForEach input - // tuple res = (key, input#1, input#2....input#n), the only missing value is input#n, - // we will get input#n one tuple a time, fill in res, feed to ForEach. - // After this block, we have the first tuple of input#n in hand (kept in variable it) - if (newKey) - { - lastInputTuple = false; - //Put n-1 inputs into bags - dbs = new DataBag[numInputs]; - for (int i = 0; i < numInputs - 1; i++) { - dbs[i] = useDefaultBag ? BagFactory.getInstance().newDefaultBag() - // In a very rare case if there is a POStream after this - // POJoinPackage in the pipeline and is also blocking the pipeline; - // constructor argument should be 2 * numInputs. But for one obscure - // case we don't want to pay the penalty all the time. - : new InternalCachedBag(numInputs-1); - } - // For last bag, we always use NonSpillableBag. - dbs[lastBagIndex] = new NonSpillableDataBag((int)chunkSize); - - //For each Nullable tuple in the input, put it - //into the corresponding bag based on the index, - // except for the last input, which we will stream - // The tuples will arrive in the order of the index, - // starting from index 0 and such that all tuples for - // a given index arrive before a tuple for the next - // index does. - while (tupIter.hasNext()) { - it = tupIter.next(); - int itIndex = it.getIndex(); - if (itIndex!= numInputs - 1) - { - dbs[itIndex].add(getValueTuple(it, itIndex)); - } - else - { - lastInputTuple = true; - break; - } - if(getReporter()!=null) { - getReporter().progress(); - } - } - // If we don't have any tuple for input#n - // we do not need any further process, return EOP - if (!lastInputTuple) - { - // we will return at this point because we ought - // to be having a flatten on this last input - // and we have an empty bag which should result - // in this key being taken out of the output - newKey = true; - return eopResult; - } - - res = mTupleFactory.newTuple(numInputs+1); - for (int i = 0; i < dbs.length; i++) - res.set(i+1,dbs[i]); - - res.set(0,key); - // if we have an inner anywhere and the corresponding - // bag is empty, we can just return - for (int i = 0; i < dbs.length - 1; i++) { - if(inner[i]&&dbs[i].size()==0){ - detachInput(); - return eopResult; - } - } - newKey = false; - - // set up the bag with last input to contain - // a chunk of CHUNKSIZE values OR the entire bag if - // it has less than CHUNKSIZE values - the idea is in most - // cases the values are > CHUNKSIZE in number and in - // those cases we will be sending the last bag - // as a set of smaller chunked bags thus holding lesser - // in memory - - // the first tuple can be directly retrieved from "it" - dbs[lastBagIndex].add(getValueTuple(it, it.getIndex())); - for(int i = 0; i < chunkSize -1 && tupIter.hasNext(); i++) { - it = tupIter.next(); - dbs[lastBagIndex].add(getValueTuple(it, it.getIndex())); - } - - // Attach the input to forEach - forEach.attachInput(res); - - // pull output tuple from ForEach - Result forEachResult = forEach.getNextTuple(); - { - switch (forEachResult.returnStatus) - { - case POStatus.STATUS_OK: - case POStatus.STATUS_NULL: - case POStatus.STATUS_ERR: - return forEachResult; - case POStatus.STATUS_EOP: - break; - } - } - } - - // Keep attaching input tuple to ForEach, until: - // 1. We can initialize ForEach.getNext(); - // 2. There is no more input#n - while (true) - { - if (tupIter.hasNext()) { - // try setting up a bag of CHUNKSIZE OR - // the remainder of the bag of last input - // (if < CHUNKSIZE) to foreach - dbs[lastBagIndex].clear(); // clear last chunk - for(int i = 0; i < chunkSize && tupIter.hasNext(); i++) { - it = tupIter.next(); - dbs[lastBagIndex].add(getValueTuple(it, it.getIndex())); - } - } - else - // if we do not have any more tuples for input#n, return EOP - { - detachInput(); - newKey = true; - return eopResult; - } - // Attach the input to forEach - forEach.attachInput(res); - - // pull output tuple from ForEach - Result forEachResult = forEach.getNextTuple(); - { - switch (forEachResult.returnStatus) - { - case POStatus.STATUS_OK: - case POStatus.STATUS_NULL: - case POStatus.STATUS_ERR: - return forEachResult; - case POStatus.STATUS_EOP: - break; - } - } - } - } - - public List getInputPlans() { - return forEach.getInputPlans(); - } - - public void setInputPlans(List plans) { - forEach.setInputPlans(plans); - } - - public void setToBeFlattened(List flattens) { - forEach.setToBeFlattened(flattens); - } - - /** - * @return the forEach - */ - public POOptimizedForEach getForEach() { - return forEach; - } - - /** - * @param chunkSize - the chunk size for the biggest input - */ - public void setChunkSize(long chunkSize) { - this.chunkSize = chunkSize; - } -} diff --git a/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMultiQueryPackage.java b/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMultiQueryPackage.java deleted file mode 100644 index d6041741a3..0000000000 --- a/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMultiQueryPackage.java +++ /dev/null @@ -1,310 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.Iterator; -import java.util.List; - -import org.apache.pig.PigException; -import org.apache.pig.backend.executionengine.ExecException; -import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; -import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result; -import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor; -import org.apache.pig.data.DataType; -import org.apache.pig.data.Tuple; -import org.apache.pig.impl.io.NullableTuple; -import org.apache.pig.impl.io.NullableUnknownWritable; -import org.apache.pig.impl.io.PigNullableWritable; -import org.apache.pig.impl.plan.OperatorKey; -import org.apache.pig.impl.plan.VisitorException; -import org.apache.pig.backend.hadoop.HDataType; - -/** - * The package operator that packages the globally rearranged tuples - * into output format as required by multi-query de-multiplexer. - *

- * This operator is used when merging multiple Map-Reduce splittees - * into a Map-only splitter during multi-query optimization. - * The package operators of the reduce plans of the splittees form an - * indexed package list inside this operator. When this operator - * receives an input, it extracts the index from the key and calls the - * corresponding package to get the output data. - *

- * Due to the recursive nature of multi-query optimization, this operator - * may be contained in another multi-query packager. - *

- * The successor of this operator must be a PODemux operator which - * knows how to consume the output of this operator. - */ -public class POMultiQueryPackage extends POPackage { - - private static final long serialVersionUID = 1L; - - private static int idxPart = 0x7F; - - private List packages = new ArrayList(); - - /** - * If the POLocalRearranges corresponding to the reduce plans in - * myPlans (the list of inner plans of the demux) have different key types - * then the MultiQueryOptimizer converts all the keys to be of type tuple - * by wrapping any non-tuple keys into Tuples (keys which are already tuples - * are left alone). - * The list below is a list of booleans indicating whether extra tuple wrapping - * was done for the key in the corresponding POLocalRearranges and if we need - * to "unwrap" the tuple to get to the key - */ - private ArrayList isKeyWrapped = new ArrayList(); - - /* - * Indicating if all the inner plans have the same - * map key type. If not, the keys passed in are - * wrapped inside tuples and need to be extracted - * out during the reduce phase - */ - private boolean sameMapKeyType = true; - - /* - * Indicating if this operator is in a combiner. - * If not, this operator is in a reducer and the key - * values must first be extracted from the tuple-wrap - * before writing out to the disk - */ - private boolean inCombiner = false; - - transient private PigNullableWritable myKey; - - /** - * Constructs an operator with the specified key. - * - * @param k the operator key - */ - public POMultiQueryPackage(OperatorKey k) { - this(k, -1, null); - } - - /** - * Constructs an operator with the specified key - * and degree of parallelism. - * - * @param k the operator key - * @param rp the degree of parallelism requested - */ - public POMultiQueryPackage(OperatorKey k, int rp) { - this(k, rp, null); - } - - /** - * Constructs an operator with the specified key and inputs. - * - * @param k the operator key - * @param inp the inputs that this operator will read data from - */ - public POMultiQueryPackage(OperatorKey k, List inp) { - this(k, -1, inp); - } - - /** - * Constructs an operator with the specified key, - * degree of parallelism and inputs. - * - * @param k the operator key - * @param rp the degree of parallelism requested - * @param inp the inputs that this operator will read data from - */ - public POMultiQueryPackage(OperatorKey k, int rp, List inp) { - super(k, rp, inp); - } - - @Override - public String name() { - return "MultiQuery Package [" + isKeyWrapped + "] - " + getOperatorKey().toString(); - } - - @Override - public boolean supportsMultipleInputs() { - return false; - } - - @Override - public void visit(PhyPlanVisitor v) throws VisitorException { - v.visitMultiQueryPackage(this); - } - - @Override - public boolean supportsMultipleOutputs() { - return false; - } - - @Override - public void attachInput(PigNullableWritable k, Iterator inp) { - tupIter = inp; - myKey = k; - } - - @Override - public void detachInput() { - tupIter = null; - myKey = null; - } - - /** - * Appends the specified package object to the end of - * the package list. - * - * @param pack package to be appended to the list - */ - public void addPackage(POPackage pack) { - packages.add(pack); - } - - /** - * Appends the specified package object to the end of - * the package list. - * - * @param pack package to be appended to the list - * @param mapKeyType the map key type associated with the package - */ - public void addPackage(POPackage pack, byte mapKeyType) { - packages.add(pack); - // if mapKeyType is already a tuple, we will NOT - // be wrapping it in an extra tuple. If it is not - // a tuple, we will wrap into in a tuple - isKeyWrapped.add(mapKeyType == DataType.TUPLE ? false : true); - } - - /** - * Returns the list of packages. - * - * @return the list of the packages - */ - public List getPackages() { - return packages; - } - - /** - * Constructs the output tuple from the inputs. - *

- * The output is consumed by for the demultiplexer operator - * (PODemux) in the format (key, {bag of tuples}) where key - * is an indexed WritableComparable, not the wrapped value as a pig type. - */ - @Override - public Result getNextTuple() throws ExecException { - - byte origIndex = myKey.getIndex(); - - int index = (int)origIndex; - index &= idxPart; - - if (index >= packages.size() || index < 0) { - int errCode = 2140; - String msg = "Invalid package index " + index - + " should be in the range between 0 and " + packages.size(); - throw new ExecException(msg, errCode, PigException.BUG); - } - - POPackage pack = packages.get(index); - - // check to see if we need to unwrap the key. The keys may be - // wrapped inside a tuple by LocalRearrange operator when jobs - // with different map key types are merged - PigNullableWritable curKey = myKey; - if (!sameMapKeyType && !inCombiner && isKeyWrapped.get(index)) { - Tuple tup = (Tuple)myKey.getValueAsPigType(); - curKey = HDataType.getWritableComparableTypes(tup.get(0), pack.getKeyType()); - curKey.setIndex(origIndex); - } - - pack.attachInput(curKey, tupIter); - - Result res = pack.getNextTuple(); - pack.detachInput(); - - Tuple tuple = (Tuple)res.result; - - // the object present in the first field - // of the tuple above is the real data without - // index information - this is because the - // package above, extracts the real data out of - // the PigNullableWritable object - we are going to - // give this result tuple to a PODemux operator - // which needs a PigNullableWritable first field so - // it can figure out the index. Therefore we need - // to add index to the first field of the tuple. - - Object obj = tuple.get(0); - if (obj instanceof PigNullableWritable) { - ((PigNullableWritable)obj).setIndex(origIndex); - } - else { - PigNullableWritable myObj = null; - if (obj == null) { - myObj = new NullableUnknownWritable(); - myObj.setNull(true); - } - else { - myObj = HDataType.getWritableComparableTypes(obj, HDataType.findTypeFromNullableWritable(curKey)); - } - myObj.setIndex(origIndex); - tuple.set(0, myObj); - } - // illustrator markup has been handled by "pack" - return res; - } - - /** - * Returns the list of booleans that indicates if the - * key needs to unwrapped for the corresponding plan. - * - * @return the list of isKeyWrapped boolean values - */ - public List getIsKeyWrappedList() { - return Collections.unmodifiableList(isKeyWrapped); - } - - /** - * Adds a list of IsKeyWrapped boolean values - * - * @param lst the list of boolean values to add - */ - public void addIsKeyWrappedList(List lst) { - for (Boolean b : lst) { - isKeyWrapped.add(b); - } - } - - public void setInCombiner(boolean inCombiner) { - this.inCombiner = inCombiner; - } - - public boolean isInCombiner() { - return inCombiner; - } - - public void setSameMapKeyType(boolean sameMapKeyType) { - this.sameMapKeyType = sameMapKeyType; - } - - public boolean isSameMapKeyType() { - return sameMapKeyType; - } - -} diff --git a/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackageLite.java b/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackageLite.java deleted file mode 100644 index c200715e4a..0000000000 --- a/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackageLite.java +++ /dev/null @@ -1,235 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * - */ -package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators; - -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; -import java.util.HashMap; -import java.util.Map; -import java.util.Map.Entry; - -import org.apache.pig.impl.util.IdentityHashSet; -import org.apache.pig.impl.util.Pair; - -import org.apache.pig.PigException; -import org.apache.pig.backend.executionengine.ExecException; -import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus; -import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; -import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result; -import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor; -import org.apache.pig.data.DataBag; -import org.apache.pig.data.DataType; -import org.apache.pig.data.ReadOnceBag; -import org.apache.pig.data.Tuple; -import org.apache.pig.impl.io.NullableTuple; -import org.apache.pig.impl.plan.NodeIdGenerator; -import org.apache.pig.impl.plan.OperatorKey; -import org.apache.pig.impl.plan.VisitorException; -import org.apache.pig.pen.util.ExampleTuple; -import org.apache.pig.pen.util.LineageTracer; - -/** - * This package operator is a specialization - * of POPackage operator used for the specific - * case of the order by query. See JIRA 802 - * for more details. - */ -public class POPackageLite extends POPackage { - - /** - * - */ - private static final long serialVersionUID = 1L; - - public POPackageLite(OperatorKey k) { - super(k, -1, null); - } - - public POPackageLite(OperatorKey k, int rp) { - super(k, rp, null); - } - - public POPackageLite(OperatorKey k, List inp) { - super(k, -1, inp); - } - - public POPackageLite(OperatorKey k, int rp, List inp) { - super(k, rp, inp); - } - - /* (non-Javadoc) - * @see org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage#setNumInps(int) - */ - @Override - public void setNumInps(int numInps) { - if(numInps != 1) - { - throw new RuntimeException("POPackageLite can only take 1 input"); - } - this.numInputs = numInps; - } - - public boolean[] getInner() { - throw new RuntimeException("POPackageLite does not support getInner operation"); - } - - public void setInner(boolean[] inner) { - throw new RuntimeException("POPackageLite does not support setInner operation"); - } - - /** - * Make a deep copy of this operator. - * @throws CloneNotSupportedException - */ - @Override - public POPackageLite clone() throws CloneNotSupportedException { - POPackageLite clone = (POPackageLite)super.clone(); - clone.inner = null; - clone.keyInfo = new HashMap>>(); - for (Entry>> entry: keyInfo.entrySet()) { - clone.keyInfo.put(entry.getKey(), entry.getValue()); - } - return clone; - } - - /** - * @return the distinct - */ - @Override - public boolean isDistinct() { - throw new RuntimeException("POPackageLite does not support isDistinct operation"); - } - - /** - * @param distinct the distinct to set - */ - @Override - public void setDistinct(boolean distinct) { - throw new RuntimeException("POPackageLite does not support setDistinct operation"); - } - - /** - * @return the isKeyTuple - */ - public boolean getKeyTuple() { - return isKeyTuple; - } - - /** - * @return the keyAsTuple - */ - public Tuple getKeyAsTuple() { - return keyAsTuple; - } - - /** - * @return the tupIter - */ - public Iterator getTupIter() { - return tupIter; - } - - /** - * @return the key - */ - public Object getKey() { - return key; - } - - /** - * Similar to POPackage.getNext except that - * only one input is expected with index 0 - * and ReadOnceBag is used instead of - * DefaultDataBag. - */ - @Override - public Result getNextTuple() throws ExecException { - Tuple res; - //Create numInputs bags - ReadOnceBag db = null; - db = new ReadOnceBag(this, tupIter, key); - if(getReporter()!=null) { - getReporter().progress(); - } - - //Construct the output tuple by appending - //the key and all the above constructed bags - //and return it. - res = mTupleFactory.newTuple(numInputs+1); - res.set(0,key); - res.set(1,db); - detachInput(); - Result r = new Result(); - r.returnStatus = POStatus.STATUS_OK; - r.result = illustratorMarkup(null, res, 0); - return r; - } - - /** - * Makes use of the superclass method, but this requires - * an additional parameter key passed by ReadOnceBag. - * key of this instance will be set to null in detachInput - * call, but an instance of ReadOnceBag may have the original - * key that it uses. Therefore this extra argument is taken - * to temporarily set it before the call to the superclass method - * and then restore it. - */ - public Tuple getValueTuple(NullableTuple ntup, int index, Object key) throws ExecException { - Object origKey = this.key; - this.key = key; - Tuple retTuple = super.getValueTuple(ntup, index); - this.key = origKey; - return retTuple; - } - - @Override - public String name() { - return getAliasString() + "PackageLite" + "[" - + DataType.findTypeName(resultType) + "]" + "{" - + DataType.findTypeName(keyType) + "}" + " - " - + mKey.toString(); - } - - @Override - public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) { - if(illustrator != null) { - ExampleTuple tOut = new ExampleTuple((Tuple) out); - LineageTracer lineageTracer = illustrator.getLineage(); - lineageTracer.insert(tOut); - if (illustrator.getEquivalenceClasses() == null) { - LinkedList> equivalenceClasses = new LinkedList>(); - for (int i = 0; i < numInputs; ++i) { - IdentityHashSet equivalenceClass = new IdentityHashSet(); - equivalenceClasses.add(equivalenceClass); - } - illustrator.setEquivalenceClasses(equivalenceClasses, this); - } - illustrator.getEquivalenceClasses().get(eqClassIndex).add(tOut); - tOut.synthetic = false; // not expect this to be really used - illustrator.addData((Tuple) tOut); - return tOut; - } else - return (Tuple) out; - } -} - From 31fb64b6b83f47f23842abdeae901970d2692098 Mon Sep 17 00:00:00 2001 From: Sonia Koza Date: Thu, 22 Jan 2026 08:56:13 +0100 Subject: [PATCH 5/5] update uplaod path --- .github/workflows/ci.yml | 32 ++++++++------------------------ 1 file changed, 8 insertions(+), 24 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index ead4bb1644..731f0d6cc9 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -107,35 +107,19 @@ jobs: - name: Deploy to staging if: github.ref == 'refs/heads/staging' run: | - # Upload the main pig JAR + # Upload the pig JAR to the correct path structure jf rt upload \ - "build/pig-*.jar" \ - "sharelib-stg/org/apache/pig/pig/${PIG_VERSION}/" \ + "build/pig-23-withouthadoop-*.jar" \ + "sharelib-stg/common/pig-23-withouthadoop/${PIG_VERSION}/" \ --server-id=xplenty-artifactory \ - --flat=false - - # Also upload the root-level JAR if it exists - if [ -f "pig-${PIG_VERSION}.jar" ]; then - jf rt upload \ - "pig-${PIG_VERSION}.jar" \ - "sharelib-stg/org/apache/pig/pig/${PIG_VERSION}/" \ - --server-id=xplenty-artifactory - fi + --flat=true - name: Deploy to production if: github.ref == 'refs/heads/master' run: | - # Upload the main pig JAR + # Upload the pig JAR to the correct path structure jf rt upload \ - "build/pig-*.jar" \ - "sharelib/org/apache/pig/pig/${PIG_VERSION}/" \ + "build/pig-23-withouthadoop-*.jar" \ + "sharelib/common/pig-23-withouthadoop/${PIG_VERSION}/" \ --server-id=xplenty-artifactory \ - --flat=false - - # Also upload the root-level JAR if it exists - if [ -f "pig-${PIG_VERSION}.jar" ]; then - jf rt upload \ - "pig-${PIG_VERSION}.jar" \ - "sharelib/org/apache/pig/pig/${PIG_VERSION}/" \ - --server-id=xplenty-artifactory - fi + --flat=true