Files
2026-05-18 20:10:00 +02:00

600 lines
25 KiB
Ruby

# frozen_string_literal: true
#-- copyright
# OpenProject is an open source project management software.
# Copyright (C) the OpenProject GmbH
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License version 3.
#
# OpenProject is a fork of ChiliProject, which is a fork of Redmine. The copyright follows:
# Copyright (C) 2006-2013 Jean-Philippe Lang
# Copyright (C) 2010-2013 the ChiliProject Team
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#
# See COPYRIGHT and LICENSE files for more details.
#++
# Will create journals for a journable (e.g. WorkPackage and Meeting)
# As a journal is basically a copy of the current state of the database, consisting of the journable as well as its
# custom values and attachments, those entries are copied in the database.
# Copying and thereby creation only takes place if a change of the current state and the last journal is identified.
# Note, that the adequate creation of a journal which represents the state that is generated by a single user action depends on
# no other user/action altering the current state at the same time especially in a multi process/thread setup.
# Therefore, the whole update of a journable needs to be safeguarded by a mutex. In our implementation, we use
#
# OpenProject::Mutex.with_advisory_lock_transaction(journable)
#
# for this purpose.
# rubocop:disable Rails/SquishedSQLHeredocs
module Journals
class CreateService
include Helpers
attr_reader :journable, :user, :associations
def initialize(journable, user)
@user = user
@journable = journable
@associations = Association.for(journable)
end
def call(notes: "", internal: false, cause: CauseOfChange::NoCause.new)
Journal.transaction do
journal = create_journal(notes, internal, cause)
if journal
reload_journals
end
ServiceResult.success result: journal
end
end
private
# Aggregatable journals must meet the following criteria:
#
# * The journalizing happens within the configured aggregation time
# * The journalizing is carried out by the same user
# * The cause is identical
# * ONLY one note exists between the predecessor and the journal to be created
# * The predecessor and the journal to be created have the same restriction
#
# Instead of removing the predecessor, return it here so that it can be stripped in the journal creating
# SQL to than be refilled. That way, references to the journal, including ones users have, are kept intact.
def aggregatable_predecessor(notes, internal, cause)
predecessor = journable.last_journal
predecessor if aggregatable?(predecessor, notes, internal, cause)
end
def create_journal(notes, internal, cause)
predecessor = aggregatable_predecessor(notes, internal, cause)
log_journal_creation(predecessor)
create_sql = create_journal_sql(predecessor, notes, internal, cause)
# We need to ensure that the result is genuine. Otherwise,
# calling the service repeatedly for the same journable
# could e.g. return a (query cached) journal creation
# that then e.g. leads to the later code thinking that a journal was
# created.
result = Journal.connection.uncached do
::Journal
.connection
.select_one(create_sql)
end
Journal.instantiate(result) if result
end
# The result of the whole SQL statement is a snapshot of the journable (e.g. a WorkPackage or a WikPage) at the point
# the SQL statement was run:
# * There will be either a newly created entry in the `journals` table or an updated entry in it if the former journal
# of the journable was aggregated (two consecutive updates within a configurable time frame by the same user).
# * A new entry in the data table of the journable (e.g. work_package_journals for a WorkPackage) will be created
# containing a copy of the columns of the journable.
# * New entries in the attachable_journals table, one for every attachment the journable has at the time.
# * New entries in the customizable_journals table, one for every custom value the journable has at the time.
# * New entries in the storages_file_links_journals table, one for file link value the journable has at the time.
# * New entries in the meeting_agenda_item_journals table, one for agenda_item the journable has at the time.
#
# It consists of a couple of parts that are kept as individual queries (as CTEs) but
# are all executed within a single database call.
#
# The next CTEs (`max_journals`) responsibility is to fetch the latest journal and have that available for later queries
# (i.e. when determining the latest state of the journable, when getting the current version number and when
# comparing the timestamps of the last journalization time and the work package's updated_at time).
#
# The next CTE (`changes`) determines whether a change as occurred so that a new journal needs to be created. This check
# is carried out the check if journalization needs to be carried out at all. To determine
# whether a change is worthy of being journalized, the current and the latest journalized state are compared in four aspects:
# * the journable's table columns are compared to the columns in the journable's journal data table
# (e.g. work_package_journals for WorkPackages). Only columns that exist in the journable's journal data table are considered
# (and some columns like the primary key `id` is ignored). Therefore, to add an attribute to be journalized, it needs to
# be added to that table.
# * the journable's attachments are compared to the attachable_journals entries being associated with the most recent journal.
# * the journable's custom values are compared to the customizable_journals entries being associated with the most
# recent journal.
# * the journable's file_links are compared to the storages_file_links_journals entries being associated with the most
# recent journal.
# * the journable's meeting_agenda_items are compared to the meeting_agenda_item_journals entries being associated with the
# most recent journal.
# When comparing text based values, newlines are normalized as otherwise users having a different OS might change a text value
# without intending to.
#
# Journalization continues only if
# * a change has been identified (by the `changes` CTE)
# * OR a note is present
# * OR a cause is present (which would be different from the cause of the predecessor as that one would otherwise be
# aggregated with)
# * OR a predecessor is replaced
#
# If a change has been identified (not for a note or a cause) and a predecessor to aggregate with exists, the predecessor's
# data is stripped. this is done by the CTEs 'cleanup_predecessor_data' and the ones for the associated data,
# 'cleanup_predecessor_XYZ' (where XYZ is e.g. attachable or customizable). If no predecessor exists,
# a noop SQL statement is run instead.
# To strip the information from the journal, the data record (e.g. from work_packages_journals) as well as the
# associated data information is removed. The journal itself is kept and will later on have its
# updated_at and possibly its notes property updated.
#
# To enforce consistent timestamps throughout the data structure of journable and journal, the time used for further timestamp
# setting is then calculated once between the two CTEs `touch_journable` and `fetch_time`. The auxiliary tables
# (attachable_journals, customizable_journals and storages_file_links_journals) don't have timestamps so they can be
# disregarded.
#
# Most of the time, the time used will be the updated_at of the journable. This follows the logic that most of the time
# the journable receives new attributes first which will subsequently trigger the run of this service. During the course
# of the journable saving, the updated_at will receive a current timestamp by Rails. But if only a cause or a note is added,
# or if a custom value, an attachable or a file_link is altered, the journable will not have been touched before and
# therefore the time this SQL statement is run at will be used. The SQL will in this case touch the journable with that
# timestamp itself (`touch_journable`).
# Whether the journable was updated before or the SQL statement did it, the value of either will end up in the result
# of the `fetch_time` CTE to be used in the later stages of the SQL.
#
# After the SQL is run, the timestamps of the journable, the predecessor journal and the newly created journal will have
# interdependencies:
# * If a new journal is created (i.e. no predecessor is aggregated):
# * The updated_at of the journable, the newly created journal's created_at, updated_at and the lower bound of its
# validity_period as well as the upper bound of the predecessor's validity_period will be the same (`fetch_time` value).
# * The upper bound of the newly created journal's validity_period will be NULL meaning that it does not end yet.
# * If a predecessor is aggregated:
# * The updated_at of the journable and the aggregated journal's updated_at will be the same.
# * The created_at of the aggregated journal as well as its lower bound of the validity_period will be the same as
# before.
# The timestamps are updated by `touch_journable` and `update_predecessor` respectively.
#
# In case of no aggregation, the preceding journal will now have values for both the upper as well as the lower bound
# of its validity_range. Such a journal can then be considered closed.
#
# The `inserted_journal` will either update the updated_at value of the aggregated predecessor or, in the absence
# of an aggregated predecessor create a new journal with the timestamps as described above. Both rely on the return
# value of `insert_data'. That CTE, on the basis of what was identified in `changes` (and only if there are some)
# writes the snapshot of the journables column into the correct data journal (e.g work_package_journals for a
# WorkPackage). The return values of the `insert_data` is relevant also for the `inserted_data` CTE as the `data_id` field
# needs to be inserted into the journal. This is also the reason why the `insert_data` CTE is run before
# the `inserted_journal`.
#
# All cases (having a change, a note or a cause) can at this point be identified by a journal having been created
# or replaced which are treated the same.
# Therefore, the return value of the `inserted_journal` is further on used to identify whether the next statements
# (`insert_attachable`, `insert_customizable` and `insert_storable`) should actually insert data. It is additionally
# used as the values returned by the overall SQL statement so that an AR instance can be instantiated with it.
#
def create_journal_sql(predecessor, notes, internal, cause)
journal_modifications = journal_modification_sql(predecessor, notes, internal, cause)
relation_modifications = relation_modifications_sql(predecessor, notes, cause)
journal_cte_clauses = [journal_modifications]
journal_cte_clauses << relation_modifications if relation_modifications.any?
<<~SQL
WITH #{journal_cte_clauses.join(',')}
SELECT * from inserted_journal
SQL
end
def journal_modification_sql(predecessor, notes, internal, cause)
<<~SQL
max_journals AS (
#{select_max_journal_sql}
), changes AS (
#{select_changed_sql}
), cleanup_predecessor_data AS (
#{cleanup_predecessor_data_sql(predecessor, notes, cause)}
), touch_journable AS (
#{touch_journable_sql(notes, cause)}
), fetch_time AS (
#{fetch_time_sql}
), insert_data AS (
#{insert_data_sql(notes, cause)}
), update_predecessor AS (
#{update_predecessor_sql(predecessor, notes, cause)}
), inserted_journal AS (
#{update_or_insert_journal_sql(predecessor, notes, internal, cause)}
)
SQL
end
def relation_modifications_sql(predecessor, notes, cause)
associations.map do |association|
association_modifications_sql(association, predecessor, notes, cause)
end
end
def association_modifications_sql(association, predecessor, notes, cause)
<<~SQL
cleanup_predecessor_#{association.name} AS (
#{association.cleanup_predecessor(predecessor, notes, cause)}
), insert_#{association.name} AS (
#{association.insert_sql}
)
SQL
end
def select_max_journal_sql
sanitize(<<~SQL, journable_id:, journable_type:)
SELECT
:journable_id journable_id,
:journable_type journable_type,
updated_at,
COALESCE(journals.version, fallback.version) AS version,
COALESCE(journals.id, 0) id,
COALESCE(journals.data_id, 0) data_id
FROM
journals
RIGHT OUTER JOIN
(SELECT 0 AS version) fallback
ON
journals.journable_id = :journable_id
AND journals.journable_type = :journable_type
AND journals.version IN (
SELECT MAX(version)
FROM journals
WHERE journable_id = :journable_id
AND journable_type = :journable_type
)
SQL
end
def select_changed_sql
sql = <<~SQL
SELECT
*
FROM
(#{changes_data_sql}) data_changes
SQL
associations.each do |association|
sql += <<~SQL
FULL JOIN
(#{association.changes_sql}) #{association.name}_changes
ON
#{association.name}_changes.journable_id = data_changes.journable_id
SQL
end
sql
end
def cleanup_predecessor_data_sql(predecessor, notes, cause)
cleanup_predecessor_for(predecessor,
notes,
cause,
data_table_name,
:id,
:data_id)
end
# Updates the updated_at timestamp of the journable.
#
# Whenever an attribute is updated on the journable before creating the journal, the updated_at timestamp
# will already have been increased so nothing needs to be done.
# But if any of the associated data is updated or if only a cause or note is added, the journable would
# otherwise not have received an updated timestamp.
#
# Therefore, this is only carried out if:
# * if there are changes or a note or a cause
# * AND the journable doesn't already have a newer timestamp than the most recent journal
def touch_journable_sql(notes, cause)
if journable.class.aaj_options[:timestamp].to_sym == :updated_at
sql = <<~SQL
UPDATE
#{journable_table_name}
SET
updated_at = statement_timestamp()
WHERE
id = :id
#{only_on_changed_or_forced_condition_sql(notes, cause)}
AND NOT updated_at > (SELECT updated_at FROM max_journals)
RETURNING updated_at
SQL
sanitize(sql,
id: journable.id)
else
<<~SQL
SELECT NULL::timestamp with time zone AS updated_at
SQL
end
end
# Fetches the timestamp to be used by all subsequent SQL statements e.g. for
# * setting the created_at and updated_at timestamps of the newly created journal
# * setting the updated_at timestamp on an updated (aggregated with) journal
# * setting the validity_period (upper bound) of the preceding journal.
def fetch_time_sql
sanitize(<<~SQL, journable_timestamp:)
SELECT COALESCE((SELECT updated_at FROM touch_journable), :journable_timestamp) AS updated_at
SQL
end
def insert_data_sql(notes, cause)
sanitize(<<~SQL, journable_id:)
INSERT INTO
#{data_table_name} (
#{data_sink_columns}
)
SELECT
#{data_source_columns}
FROM #{journable_table_name}
#{journable_data_sql_addition}
WHERE
#{journable_table_name}.id = :journable_id
#{only_on_changed_or_forced_condition_sql(notes, cause)}
RETURNING *
SQL
end
# Sets the validity_period's upper boundary of the preceding journal to the created_at timestamp of the inserted journal.
# The upper bound set is not included.
# If there is a predecessor (meaning we are aggregating/updating an existing journal), nothing is done.
# In that case, the preceding journal is the one we are currently aggregating with so it will still remain
# the most recent one.
def update_predecessor_sql(predecessor, notes, cause)
return "SELECT 1" if predecessor.present?
<<~SQL
UPDATE
journals
SET
validity_period = tstzrange(lower(validity_period), (SELECT updated_at FROM fetch_time), '[)')
WHERE
id = (SELECT id from max_journals)
#{only_on_changed_or_forced_condition_sql(notes, cause)}
SQL
end
def update_or_insert_journal_sql(predecessor, notes, internal, cause)
if predecessor
update_journal_sql(predecessor, notes, cause)
else
insert_journal_sql(notes, internal, cause)
end
end
def update_journal_sql(predecessor, notes, cause)
# If there is a predecessor, we don't want to create a new one, we simply rewrite it.
# The original data of that predecessor (data e.g. work_package_journals, customizable_journals, attachable_journals)
# has been deleted before but the notes need to be taken over and the timestamps updated as if the
# journal would have been created.
#
# A lot of the data does not need to be set anew, since we only aggregate if that data stays the same
# (e.g. the user_id).
#
# In case there is no change at all, the journal will not need to be modified. But even
# without a change, having notes or a cause will require to have the journal written.
sql = <<~SQL
UPDATE
journals
SET
notes = :notes,
updated_at = (SELECT updated_at FROM fetch_time),
data_id = insert_data.id,
cause = :cause
FROM insert_data
WHERE journals.id = :predecessor_id
#{only_on_changed_or_forced_condition_sql(notes, cause)}
RETURNING
journals.*
SQL
sanitize(sql,
notes: notes.presence || predecessor.notes,
predecessor_id: predecessor.id,
cause: cause_sql(cause))
end
def insert_journal_sql(notes, internal, cause)
sql = <<~SQL
INSERT INTO
journals (
journable_id,
journable_type,
version,
user_id,
notes,
restricted,
created_at,
updated_at,
data_id,
data_type,
cause,
validity_period
)
SELECT
:journable_id,
:journable_type,
COALESCE(max_journals.version, 0) + 1,
:user_id,
:notes,
:restricted,
(SELECT updated_at FROM fetch_time),
(SELECT updated_at FROM fetch_time),
insert_data.id,
:data_type,
:cause,
tstzrange((SELECT updated_at FROM fetch_time), NULL)
FROM max_journals, insert_data
RETURNING *
SQL
sanitize(sql,
notes:,
restricted: internal,
cause: cause_sql(cause),
journable_id:,
journable_type:,
user_id: user.id,
data_type: journable.class.journal_class.name)
end
def changes_data_sql
sanitize(<<~SQL, journable_id:)
SELECT
#{journable_table_name}.id journable_id
FROM
(SELECT * FROM #{journable_table_name} #{journable_data_sql_addition}) #{journable_table_name}
LEFT JOIN
(SELECT * FROM max_journals
JOIN
#{data_table_name}
ON
#{data_table_name}.id = max_journals.data_id) #{data_table_name}
ON
#{journable_table_name}.id = #{data_table_name}.journable_id
WHERE
#{journable_table_name}.id = :journable_id AND (#{data_changes_condition_sql})
SQL
end
def data_changes_condition_sql
data_table = data_table_name
journable_table = journable_table_name
data_changes = (journable.journaled_columns_names - text_column_names).map do |column_name|
<<~SQL
(#{journable_table}.#{column_name} IS DISTINCT FROM #{data_table}.#{column_name})
SQL
end
data_changes += text_column_names.map do |column_name|
<<~SQL
#{normalize_newlines_sql("#{journable_table}.#{column_name}")} !=
#{normalize_newlines_sql("#{data_table}.#{column_name}")}
SQL
end
data_changes.join(" OR ")
end
def data_sink_columns
text_columns = text_column_names
(journable.journaled_columns_names - text_columns + text_columns).join(", ")
end
def data_source_columns
text_columns = text_column_names
normalized_text_columns = text_columns.map { |column| normalize_newlines_sql(column) }
(journable.journaled_columns_names - text_columns + normalized_text_columns).join(", ")
end
def journable_data_sql_addition
journable.class.aaj_options[:data_sql]&.call(journable) || ""
end
def text_column_names
journable.class.columns_hash.select { |_, v| v.type == :text }.keys.map(&:to_sym) & journable.journaled_columns_names
end
def journable_timestamp
journable.send(journable.class.aaj_options[:timestamp])
end
def journable_type
journable.class.base_class.name
end
def journable_table_name
journable.class.table_name
end
def data_table_name
journable.class.journal_class.table_name
end
def cause_sql(cause)
# Using the same encoder mechanism that ActiveRecord uses for json/jsonb columns
ActiveSupport::JSON.encode(cause || {})
end
# Because we added the journal via bare metal sql, rails does not yet
# know of the journal. If the journable has the journals loaded already,
# the caller might expect the journals to also be updated so we do it for him.
def reload_journals
journable.journals.reload if journable.journals.loaded?
end
def aggregatable?(predecessor, notes, internal, cause)
predecessor.present? &&
aggregation_active? &&
within_aggregation_time?(predecessor) &&
same_user?(predecessor) &&
only_one_or_same_cause?(predecessor, cause) &&
only_one_note?(predecessor, notes) &&
same_restriction?(predecessor, internal)
end
def aggregation_active?
Setting.journal_aggregation_time_minutes.to_i > 0
end
def within_aggregation_time?(predecessor)
minutes = journable.class.try(:journal_aggregation_time_minutes) ||
Setting.journal_aggregation_time_minutes.to_i
predecessor.updated_at >= (Time.zone.now - minutes.minutes)
end
def same_user?(predecessor)
predecessor.user_id == user.id
end
def only_one_or_same_cause?(predecessor, cause)
predecessor.cause.empty? || cause.blank? || predecessor.cause == cause
end
def only_one_note?(predecessor, notes)
predecessor.notes.empty? || notes.empty?
end
def same_restriction?(predecessor, internal)
predecessor.internal == internal
end
def log_journal_creation(predecessor)
if predecessor
Rails.logger.debug { "[#{self.class.name}] Aggregating journal #{predecessor.id} for #{journable_type} ##{journable.id}" }
else
Rails.logger.debug { "[#{self.class.name}] Inserting new journal for #{journable_type} ##{journable.id}" }
end
end
end
end
# rubocop:enable Rails/SquishedSQLHeredocs