handle importing posts from json dumps

staging
multiple creatures 2019-05-21 01:58:07 -05:00
parent cbdadfb5fa
commit d6f37c6ae0
10 changed files with 231 additions and 14 deletions

View File

@ -89,6 +89,8 @@ class ActivityPub::Activity
def distribute(status)
crawl_links(status)
return if @options[:imported]
notify_about_reblog(status) if reblog_of_local_account?(status)
notify_about_mentions(status)

View File

@ -2,7 +2,7 @@
class ActivityPub::Activity::Announce < ActivityPub::Activity
def perform
return reject_payload! if delete_arrived_first?(@json['id']) || !related_to_local_activity?
return reject_payload! if !@options[:imported] && (delete_arrived_first?(@json['id']) || !related_to_local_activity?)
original_status = status_from_object
@ -15,10 +15,11 @@ class ActivityPub::Activity::Announce < ActivityPub::Activity
status = Status.create!(
account: @account,
reblog: original_status,
uri: @json['id'],
uri: @options[:imported] ? nil : @json['id'],
created_at: @json['published'],
override_timestamps: @options[:override_timestamps],
visibility: visibility_from_audience
visibility: visibility_from_audience,
imported: @options[:imported] == true
)
distribute(status)

View File

@ -2,13 +2,27 @@
class ActivityPub::Activity::Create < ActivityPub::Activity
def perform
return reject_payload! if unsupported_object_type? || invalid_origin?(@object['id']) || Tombstone.exists?(uri: @object['id']) || !related_to_local_activity?
return reject_payload! if unsupported_object_type? || !@options[:imported] && (invalid_origin?(@object['id']) || Tombstone.exists?(uri: @object['id']) || !related_to_local_activity?)
RedisLock.acquire(lock_options) do |lock|
if lock.acquired?
return if delete_arrived_first?(object_uri) || poll_vote?
return if !@options[:imported] && (delete_arrived_first?(object_uri) || poll_vote?)
@status = find_existing_status
if @options[:imported]
if object_uri.present?
@origin_hash = obfuscate_origin(object_uri)
elsif @object['url'].present?
@origin_hash = obfuscate_origin(@object['url'])
elsif @object['atomUri'].present?
@origin_hash = obfuscate_origin(@object['atomUri'])
else
@origin_hash = nil
end
@status = @origin_hash.present? ? find_imported_status : nil
else
@status = find_existing_status
end
if @status.nil?
process_status
@ -37,6 +51,13 @@ class ActivityPub::Activity::Create < ActivityPub::Activity
@params[:visibility] = :unlisted if @params[:visibility] == :public && @account.force_unlisted?
@params[:sensitive] = true if @account.force_sensitive?
if @options[:imported]
@params.except!(:uri, :url)
@params[:content_type] = 'text/html'
@params[:imported] = true
@params[:origin] = @origin_hash unless @origin_hash.nil?
end
ApplicationRecord.transaction do
@status = Status.create!(@params)
attach_tags(@status)
@ -44,7 +65,9 @@ class ActivityPub::Activity::Create < ActivityPub::Activity
resolve_thread(@status)
fetch_replies(@status)
distribute(@status)
return if @options[:imported]
forward_for_reply if @status.public_visibility? || @status.unlisted_visibility?
end
@ -52,11 +75,19 @@ class ActivityPub::Activity::Create < ActivityPub::Activity
status = status_from_uri(object_uri)
end
def find_imported_status
status = Status.find_by(origin: @origin_hash)
end
def obfuscate_origin(key)
key.sub(/^http.*?\.\w+\//, '').gsub(/\H+/, '')
end
def process_status_params
@params = begin
{
uri: @object['id'],
url: object_url || @object['id'],
url: (!@options[:imported] && object_url) || @object['id'],
account: @account,
text: text_from_content || '',
language: detected_language,
@ -150,7 +181,7 @@ class ActivityPub::Activity::Create < ActivityPub::Activity
hashtag = tag['name'].gsub(/\A#/, '').mb_chars.downcase
return if hashtag.starts_with?('self:', '_self:', 'local:', '_local:')
return if !@options[:imported] && hashtag.starts_with?('self.', '_self.', 'local.', '_local.')
hashtag = Tag.where(name: hashtag).first_or_create!(name: hashtag)
@ -173,7 +204,7 @@ class ActivityPub::Activity::Create < ActivityPub::Activity
end
def process_emoji(tag)
return if skip_download?
return if @options[:imported] || skip_download?
return if tag['name'].blank? || tag['icon'].blank? || tag['icon']['url'].blank?
shortcode = tag['name'].delete(':')

View File

@ -17,14 +17,14 @@
#
class Import < ApplicationRecord
FILE_TYPES = %w(text/plain text/csv).freeze
FILE_TYPES = %w(text/plain text/html text/csv application/json application/json+ld).freeze
MODES = %i(merge overwrite).freeze
self.inheritance_column = false
belongs_to :account
enum type: [:following, :blocking, :muting, :domain_blocking]
enum type: [:following, :blocking, :muting, :domain_blocking, :statuses]
validates :type, presence: true

View File

@ -29,6 +29,9 @@
# network :boolean default(FALSE), not null
# content_type :string
# footer :text
# edited :boolean
# imported :boolean
# origin :string
#
class Status < ApplicationRecord

View File

@ -3,7 +3,28 @@
require 'csv'
class ImportService < BaseService
include RoutingHelper
include JsonLdHelper
ROWS_PROCESSING_LIMIT = 20_000
CONTENT_TYPES = %w(text/bbcode+markdown text/markdown text/bbcode text/html text/plain).freeze
VISIBILITIES = [:public, :unlisted, :private, :direct, :limited].freeze
IMPORT_STATUS_ATTRIBUTES = [
'id',
'content_type',
'spoiler_text',
'text',
'footer',
'in_reply_to_id',
'reply',
'reblog_of_id',
'created_at',
'conversation_id',
'sensitive',
'language',
'local_only',
'visibility',
].freeze
def call(import)
@import = import
@ -18,6 +39,8 @@ class ImportService < BaseService
import_mutes!
when 'domain_blocking'
import_domain_blocks!
when 'statuses'
import_statuses!
end
end
@ -63,6 +86,138 @@ class ImportService < BaseService
end
end
def import_statuses!
parse_import_data_json!
return if @data.nil?
if @import.overwrite?
@account.statuses.without_reblogs.reorder(nil).find_in_batches do |statuses|
BatchedRemoveStatusService.new.call(statuses)
end
end
return import_activitypub if @data.kind_of?(Hash) && @data['orderedItems'].present?
return unless @data.kind_of?(Array)
import_json_statuses
end
def import_json_statuses
@account.vars['_bangtags:disable'] = true
@account.save
@data.each do |json|
# skip if invalid status object
next if json.nil?
next unless json.kind_of?(Hash)
json.slice!(*IMPORT_STATUS_ATTRIBUTES)
json.compact!
next if json.blank?
# skip if missing reblog
unless json['reblog_of_id'].nil?
json['reblog_of_id'] = json['reblog_of_id'].to_i
next unless (json['reblog_of_id'] != 0 ? Status.where(id: json['reblog_of_id']).exists? : false)
end
# convert iso8601 strings to DateTime object
json['created_at'] = json['created_at'].kind_of?(String) ? DateTime.iso8601(json['created_at']).utc : Time.now.utc
if json['id'].blank?
json['id'] = nil
else
# make sure id is an integer
status_id = json['id'].to_i
json['id'] = status_id != 0 ? status_id : nil
# check for duplicate
existing_status = Status.find_by_id(json['id'])
unless existing_status.nil?
# skip if duplicate
next if (json['created_at'] - existing_status.created_at).abs < 1
# else drop the conflicting id value
json['id'] = nil
end
end
# ensure correct values & value types
json['content_type'] = 'text/plain' unless CONTENT_TYPES.include?(json['content_type'])
json['spoiler_text'] = '' unless json['spoiler_text'].kind_of?(String)
json['text'] = '' unless json['text'].kind_of?(String)
json['footer'] = nil unless json['footer'].kind_of?(String)
json['reply'] = [true, 1, "1"].include?(json['reply'])
json['in_reply_to_id'] = json['in_reply_to_id'].to_i unless json['in_reply_to_id'].nil?
json['conversation_id'] = json['conversation_id'].to_i unless json['conversation_id'].nil?
json['sensitive'] = [true, 1, "1"].include?(json['sensitive'])
json['language'] = 'en' unless json['language'].kind_of?(String) && json['language'].length > 1
json['language'] = ISO_639.find(json['language'])&.alpha2 || @account.user_default_language&.presence || 'en'
json['local_only'] = @account.user_always_local_only? || [true, 1, "1"].include?(json['local_only'])
json['visibility'] = VISIBILITIES[json['visibility'].to_i] || :unlisted
json['imported'] = true
# drop a nonexistant conversation id
unless (json['conversation_id'] != 0 ? Conversation.where(id: json['conversation_id']).exists? : false)
json['conversation_id'] = nil
end
# nullify a missing reply
unless (json['in_reply_to_id'] != 0 ? Status.where(id: json['in_reply_to_id']).exists? : false)
json['in_reply_to_id'] = nil
end
ApplicationRecord.transaction do
status = @account.statuses.create!(json.compact.symbolize_keys)
process_hashtags_service.call(status)
process_mentions_service.call(status, skip_notify: true)
end
rescue ActiveRecord::RecordInvalid, ActiveRecord::RecordNotFound, Mastodon::ValidationError => e
Rails.logger.error "Error importing status (JSON): #{e}"
nil
end
@account.vars.delete('_bangtags:disable')
@account.save
end
def import_activitypub
account_uri = ActivityPub::TagManager.instance.uri_for(@account)
followers_uri = account_followers_url(@account)
@data["orderedItems"].each do |activity|
next if activity['object'].blank?
next unless %w(Create Announce).include?(activity['type'])
object = activity['object']
activity['actor'] = account_uri
activity['to'] = if activity['to'].kind_of?(Array)
activity['to'].uniq.map { |to| to.end_with?('/followers') ? followers_uri : to }
else
[account_uri]
end
activity['cc'] = if activity['cc'].kind_of?(Array)
activity['cc'].uniq.map { |cc| cc.end_with?('/followers') ? followers_uri : cc }
else
[]
end
case activity['type']
when 'Announce'
next unless object.kind_of?(String)
when 'Note'
next unless object.kind_of?(Hash)
object['attributedTo'] = account_uri
object['to'] = activity['to']
object['cc'] = activity['cc']
object.delete('attachment')
end
activity = ActivityPub::Activity.factory(activity, @account, imported: true)
activity&.perform
rescue ActiveRecord::RecordInvalid, ActiveRecord::RecordNotFound, Mastodon::ValidationError => e
Rails.logger.error "Error importing status (ActivityPub): #{e}"
nil
end
end
def import_relationships!(action, undo_action, overwrite_scope, limit, extra_fields = {})
items = @data.take(limit).map { |row| [row['Account address']&.strip, Hash[extra_fields.map { |key, header| [key, row[header]&.strip] }]] }.reject { |(id, _)| id.blank? }
@ -89,6 +244,14 @@ class ImportService < BaseService
data = CSV.parse(import_data, headers: true)
data = CSV.parse(import_data, headers: default_headers) unless data.headers&.first&.strip&.include?(' ')
@data = data.reject(&:blank?)
rescue CSV::MalformedCSVError
@data = nil
end
def parse_import_data_json!
@data = Oj.load(import_data, mode: :strict)
rescue Oj::ParseError
@data = []
end
def import_data
@ -98,4 +261,12 @@ class ImportService < BaseService
def follow_limit
FollowLimitValidator.limit_for_account(@account)
end
def process_mentions_service
ProcessMentionsService.new
end
def process_hashtags_service
ProcessHashtagsService.new
end
end

View File

@ -84,3 +84,9 @@ else
url: (ENV['PAPERCLIP_ROOT_URL'] || '/system') + '/:class/:attachment/:id_partition/:style/:filename',
)
end
Paperclip.options[:content_type_mappings] = {
json: %w(text/plain text/html application/json application/json+ld),
jsonld: %w(text/plain text/html application/json application/json+ld),
csv: %w(text/plain),
}

View File

@ -714,7 +714,6 @@ en:
domain_blocking: Domain blocking list
following: Following list
muting: Muting list
profile: Profile information (JSON)
statuses: Roars (JSON)
upload: Upload
in_memoriam_html: In Memoriam.

View File

@ -48,7 +48,7 @@ en:
featured_tag:
name: 'You might want to use one of these:'
imports:
data: CSV file exported from another Monsterpit server
data: CSV or JSON file exported from another server
invite_request:
text: This will help us review your join request
sessions:

View File

@ -10,7 +10,7 @@
#
# It's strongly recommended that you check this file into your version control system.
ActiveRecord::Schema.define(version: 2019_05_19_130537) do
ActiveRecord::Schema.define(version: 2019_05_21_003909) do
# These are extensions that must be enabled in order to support this database
enable_extension "plpgsql"
@ -650,10 +650,14 @@ ActiveRecord::Schema.define(version: 2019_05_19_130537) do
t.boolean "network", default: false, null: false
t.string "content_type"
t.text "footer"
t.boolean "edited"
t.boolean "imported"
t.string "origin"
t.index ["account_id", "id", "visibility", "updated_at"], name: "index_statuses_20180106", order: { id: :desc }
t.index ["in_reply_to_account_id"], name: "index_statuses_on_in_reply_to_account_id"
t.index ["in_reply_to_id"], name: "index_statuses_on_in_reply_to_id"
t.index ["network"], name: "index_statuses_on_network", where: "network"
t.index ["origin"], name: "index_statuses_on_origin", unique: true
t.index ["reblog_of_id", "account_id"], name: "index_statuses_on_reblog_of_id_and_account_id"
t.index ["tsv"], name: "tsv_idx", using: :gin
t.index ["uri"], name: "index_statuses_on_uri", unique: true