Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
740 changes: 740 additions & 0 deletions .github/workflows/behave-cloudberry.yml

Large diffs are not rendered by default.

3 changes: 0 additions & 3 deletions .github/workflows/build-cloudberry.yml
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,6 @@ jobs:
DEFAULT_ENABLE_CGROUPS=false
DEFAULT_ENABLE_CORE_CHECK=true
DEFAULT_PG_SETTINGS_OPTIMIZER=""

# Define base test configurations
ALL_TESTS='{
"include": [
Expand Down Expand Up @@ -1606,8 +1605,6 @@ jobs:
continue
fi

# Parse this configuration's results

MAKE_NAME="${{ matrix.test }}-config$i" \
"${SRC_DIR}"/devops/build/automation/cloudberry/scripts/parse-test-results.sh "$config_log"
status_code=$?
Expand Down
6 changes: 4 additions & 2 deletions gpAux/gpdemo/demo_cluster.sh
Original file line number Diff line number Diff line change
Expand Up @@ -314,8 +314,10 @@ cat >> $CLUSTER_CONFIG <<-EOF

COORDINATOR_PORT=${COORDINATOR_DEMO_PORT}

# Shell to use to execute commands on all hosts
TRUSTED_SHELL="$(dirname "$0")/lalshell"
# Shell to use to execute commands on all hosts. Use an absolute path here
# because this file is later sourced by gpinitsystem, where \$0 is no longer
# demo_cluster.sh.
TRUSTED_SHELL=$(pwd)/lalshell

ENCODING=UNICODE
EOF
Expand Down
19 changes: 14 additions & 5 deletions gpMgmt/bin/analyzedb
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,12 @@ ANALYZE_ROOT_SQL = """analyze rootpartition %s"""
REPORTS_ARE_STALE_AFTER_N_DAYS = 8
NUM_REPORTS_TO_SAVE = 3


def safe_log_string(value):
if isinstance(value, str):
return value.encode('ascii', 'backslashreplace').decode('ascii')
return str(value)

GET_ALL_DATA_TABLES_SQL = """
select n.nspname as schemaname, c.relname as tablename
from pg_class c, pg_namespace n
Expand Down Expand Up @@ -430,7 +436,7 @@ class AnalyzeDb(Operation):
target = self._get_tablename_with_cols(can_schema, can_table, input_col_dict)
else: # can in root_partition_col_dict
target = self._get_tablename_with_cols(can_schema, can_table, root_partition_col_dict)
logger.info(target)
logger.info(safe_log_string(target))
target_list.append(target)
logger.info("---------------------------------------------------")

Expand Down Expand Up @@ -951,7 +957,10 @@ class AnalyzeDb(Operation):
# Create a Command object that executes a query using psql.
def create_psql_command(dbname, query):
psql_cmd = """psql %s -c %s""" % (pipes.quote(dbname), pipes.quote(query))
return Command(query, psql_cmd)
# Keep the command text intact for execution, but make the display name
# ASCII-safe so logger/output paths do not choke on UTF-8 identifiers.
safe_query_display = query.encode('ascii', 'backslashreplace').decode('ascii')
return Command(safe_query_display, psql_cmd)


def run_sql(conn, query):
Expand Down Expand Up @@ -1402,13 +1411,13 @@ class AnalyzeWorker(Worker):
self.cmd = None
return
elif self.pool.should_stop:
self.logger.debug("[%s] got cmd and pool is stopped: %s" % (self.name, self.cmd))
self.logger.debug("[%s] got cmd and pool is stopped: %s" % (self.name, safe_log_string(self.cmd)))
self.pool.markTaskDone()
self.cmd = None
else:
# run the command
# get rid of the gucs for displaying in the log
cmd_display = re.sub(r'set .*;\s*', '', self.cmd.name)
cmd_display = safe_log_string(re.sub(r'set .*;\s*', '', self.cmd.name))
self.logger.info("[%s] started %s" % (self.name, cmd_display))
start_time = time.time()
self.cmd.run()
Expand All @@ -1425,7 +1434,7 @@ class AnalyzeWorker(Worker):
except Exception as e:
self.logger.exception(e)
if self.cmd:
self.logger.debug("[%s] finished cmd with exception: %s" % (self.name, self.cmd))
self.logger.debug("[%s] finished cmd with exception: %s" % (self.name, safe_log_string(self.cmd)))
self.pool.addFinishedWorkItem(self.cmd)
self.cmd = None

Expand Down
22 changes: 14 additions & 8 deletions gpMgmt/bin/gppylib/commands/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,12 @@

CMD_CACHE = {}


def _safe_log_string(value):
if isinstance(value, str):
return value.encode('ascii', 'backslashreplace').decode('ascii')
return str(value)

# Maximum retries if sshd rejects the connection due to too many
# unauthenticated connections.
SSH_MAX_RETRY = 10
Expand Down Expand Up @@ -86,7 +92,7 @@ def markTaskDone(self):
self.work_queue.task_done()

def addCommand(self, cmd):
self.logger.debug("Adding cmd to work_queue: %s" % cmd.cmdStr)
self.logger.debug("Adding cmd to work_queue: %s" % _safe_log_string(cmd.cmdStr))
self.work_queue.put(cmd)
self._assigned += 1

Expand Down Expand Up @@ -272,20 +278,20 @@ def run(self):
self.cmd = None
return
elif self.pool.should_stop:
self.logger.debug("[%s] got cmd and pool is stopped: %s" % (self.name, self.cmd))
self.logger.debug("[%s] got cmd and pool is stopped: %s" % (self.name, _safe_log_string(self.cmd)))
self.pool.markTaskDone()
self.cmd = None
else:
self.logger.debug("[%s] got cmd: %s" % (self.name, self.cmd.cmdStr))
self.logger.debug("[%s] got cmd: %s" % (self.name, _safe_log_string(self.cmd.cmdStr)))
self.cmd.run()
self.logger.debug("[%s] finished cmd: %s" % (self.name, self.cmd))
self.logger.debug("[%s] finished cmd: %s" % (self.name, _safe_log_string(self.cmd)))
self.pool.addFinishedWorkItem(self.cmd)
self.cmd = None

except Exception as e:
self.logger.exception(e)
if self.cmd:
self.logger.debug("[%s] finished cmd with exception: %s" % (self.name, self.cmd))
self.logger.debug("[%s] finished cmd with exception: %s" % (self.name, _safe_log_string(self.cmd)))
self.pool.addFinishedWorkItem(self.cmd)
self.cmd = None

Expand Down Expand Up @@ -548,9 +554,9 @@ def __init__(self, name, cmdStr, ctxt=LOCAL, remoteHost=None, stdin=None, gphome

def __str__(self):
if self.results:
return "%s cmdStr='%s' had result: %s" % (self.name, self.cmdStr, self.results)
return "%s cmdStr='%s' had result: %s" % (self.name, _safe_log_string(self.cmdStr), self.results)
else:
return "%s cmdStr='%s'" % (self.name, self.cmdStr)
return "%s cmdStr='%s'" % (self.name, _safe_log_string(self.cmdStr))

# Start a process that will execute the command but don't wait for
# it to complete. Return the Popen object instead.
Expand All @@ -559,7 +565,7 @@ def runNoWait(self):
return self.exec_context.proc

def run(self, validateAfter=False):
self.logger.debug("Running Command: %s" % self.cmdStr)
self.logger.debug("Running Command: %s" % _safe_log_string(self.cmdStr))
self.exec_context.execute(self, pickled=self.pickled, start_new_session=self.start_new_session)

if validateAfter:
Expand Down
22 changes: 22 additions & 0 deletions gpMgmt/bin/gppylib/test/unit/test_unit_analyzedb.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import imp
import os

from gppylib.test.unit.gp_unittest import GpTestCase, run_tests


class AnalyzeDbTestCase(GpTestCase):
def setUp(self):
analyzedb_file = os.path.abspath(os.path.dirname(__file__) + "/../../../analyzedb")
self.subject = imp.load_source('analyzedb', analyzedb_file)

def test_create_psql_command_keeps_utf8_sql_but_uses_ascii_safe_display_name(self):
query = 'analyze "public"."spiegelungssätze"'

cmd = self.subject.create_psql_command('special_encoding_db', query)

self.assertEqual(cmd.name, 'analyze "public"."spiegelungss\\xe4tze"')
self.assertIn('spiegelungssätze', cmd.cmdStr)


if __name__ == '__main__':
run_tests()
22 changes: 16 additions & 6 deletions gpMgmt/sbin/gpsegstop.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,18 +116,28 @@ def run(self):
self.logger.info("Clearing segment instance lock files")
os.remove(lockfile)

status = SegStopStatus(self.datadir, True,
"Forceful termination success: rc: %d stdout: %s stderr: %s." % (
results.rc, results.stdout, results.stderr))

try:
# Use the process list and make sure that all the processes are killed at the end
# Use localhost since gpsegstop is run locally on segment hosts
unix.kill_9_segment_processes(self.datadir, postgres_pids, 'localhost')

if unix.check_pid(mypid) and mypid != -1:
# Verify that none of the known pids are still alive
any_alive = False
for pid in postgres_pids:
if unix.check_pid(pid):
any_alive = True
break
if not any_alive and succeeded and mypid != -1:
if unix.check_pid(mypid):
any_alive = True

if any_alive:
status = SegStopStatus(self.datadir, False,
"Failed forceful termnation: rc: %d stdout: %s stderr: %s." % (
"Failed forceful termination: rc: %d stdout: %s stderr: %s." % (
results.rc, results.stdout, results.stderr))
elif not is_shutdown:
status = SegStopStatus(self.datadir, True,
"Forceful termination success: rc: %d stdout: %s stderr: %s." % (
results.rc, results.stdout, results.stderr))

self.result = status
Expand Down
1 change: 1 addition & 0 deletions gpMgmt/test/behave/mgmt_utils/analyzedb.feature
Original file line number Diff line number Diff line change
Expand Up @@ -1775,6 +1775,7 @@ Feature: Incrementally analyze the database
And the user runs "dropdb schema_with_temp_table"
And the user drops the named connection "default"

@extended
Scenario: analyzedb can handle the table name with special utf-8 characters.
Given database "special_encoding_db" is dropped and recreated
And the user connects to "special_encoding_db" with named connection "default"
Expand Down
31 changes: 29 additions & 2 deletions gpMgmt/test/behave/mgmt_utils/environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from steps.gpssh_exkeys_mgmt_utils import GpsshExkeysMgmtContext
from steps.mgmt_utils import backup_bashrc, restore_bashrc
from gppylib.db import dbconn
from gppylib.commands.base import Command, REMOTE

def before_all(context):
if list(map(int, behave.__version__.split('.'))) < [1,2,6]:
Expand Down Expand Up @@ -62,19 +63,27 @@ def before_feature(context, feature):
dbconn.execSQL(context.conn, 'create table t1(a integer, b integer)')
dbconn.execSQL(context.conn, 'create table t2(c integer, d integer)')
dbconn.execSQL(context.conn, 'create table t3(e integer, f integer)')
dbconn.execSQL(context.conn, 'create table spiegelungssätze(col_ä integer, 列2 integer)')
dbconn.execSQL(context.conn, 'create view v1 as select a, b from t1, t3 where t1.a=t3.e')
dbconn.execSQL(context.conn, 'create view v2 as select c, d from t2, t3 where t2.c=t3.f')
dbconn.execSQL(context.conn, 'create view v3 as select a, d from v1, v2 where v1.a=v2.c')
dbconn.execSQL(context.conn, 'insert into t1 values(1, 2)')
dbconn.execSQL(context.conn, 'insert into t2 values(1, 3)')
dbconn.execSQL(context.conn, 'insert into t3 values(1, 4)')
dbconn.execSQL(context.conn, 'insert into spiegelungssätze values(1, 5)')
# minirepro tests require statistical data about the contents of the database
# we should execute 'ANALYZE' to fill the pg_statistic catalog table.
dbconn.execSQL(context.conn, 'analyze t1')
dbconn.execSQL(context.conn, 'analyze t2')
dbconn.execSQL(context.conn, 'analyze t3')
dbconn.execSQL(context.conn, 'analyze spiegelungssätze')
dbconn.execSQL(context.conn, 'create or replace function select_one() returns integer as $$ select 1 $$ language sql')
context.conn.commit()

if 'gppkg' in feature.tags:
run_command(context, 'bash demo/gppkg/generate_sample_gppkg.sh buildGppkg')
run_command(context, 'cp -f /tmp/sample-gppkg/sample.gppkg test/behave/mgmt_utils/steps/data/')


def after_feature(context, feature):
if 'analyzedb' in feature.tags:
context.conn.close()
Expand Down Expand Up @@ -102,6 +111,9 @@ def before_scenario(context, scenario):
if 'gprecoverseg' in context.feature.tags:
context.mirror_context = MirrorMgmtContext()

if 'gprecoverseg_newhost' in context.feature.tags:
context.mirror_context = MirrorMgmtContext()

if 'gpconfig' in context.feature.tags:
context.gpconfig_context = GpConfigContext()

Expand Down Expand Up @@ -146,11 +158,17 @@ def after_scenario(context, scenario):
return

tags_to_cleanup = ['gpmovemirrors', 'gpssh-exkeys']
if set(context.feature.tags).intersection(tags_to_cleanup):
if set(context.feature.tags).intersection(tags_to_cleanup) and "skip_cleanup" not in scenario.effective_tags:
if 'temp_base_dir' in context and os.path.exists(context.temp_base_dir):
os.chmod(context.temp_base_dir, 0o700)
shutil.rmtree(context.temp_base_dir)

if 'umount_required' in context and context.umount_required:
context.execute_steps('''
# unmounting all mounter filesystem in concourse cluster
Then umount all mounted filesystem
''')

tags_to_not_restart_db = ['analyzedb', 'gpssh-exkeys']
if not set(context.feature.tags).intersection(tags_to_not_restart_db):
start_database_if_not_started(context)
Expand Down Expand Up @@ -182,3 +200,12 @@ def after_scenario(context, scenario):
execute_sql('postgres', create_fault_query)
reset_fault_query = "SELECT gp_inject_fault_infinite('all', 'reset', dbid) FROM gp_segment_configuration WHERE status='u';"
execute_sql('postgres', reset_fault_query)

if os.getenv('SUSPEND_PG_REWIND') is not None:
del os.environ['SUSPEND_PG_REWIND']

if "remove_rsync_bash" in scenario.effective_tags:
for host in context.hosts_with_rsync_bash:
cmd = Command(name='remove /usr/local/bin/rsync', cmdStr="sudo rm /usr/local/bin/rsync", remoteHost=host,
ctxt=REMOTE)
cmd.run(validateAfter=True)
Loading
Loading