Uploaded image for project: 'ejabberd development'
  1. ejabberd development
  2. EJAB-280

mod_archive replication support + bugfixes

    Details

    • Type: Improvement
    • Status: Closed
    • Priority: Minor
    • Resolution: Won't Fix
    • Affects Version/s: ejabberd 1.1.3
    • Fix Version/s: None
    • Component/s: XMPP Compliance
    • Labels:
      None

      Description

      According to XEP-136, server implementation must allow the client to replicate archive, i.e. get all changes in archive stored on server side since last synchronization. This feature is very important for proper client support implementation, as it allows faster operations (using locally cached archive) as well as searches through the archive.

      Here is the patch to mod_archive that adds preliminary support for this feature. I haven't tested all possible scenarios yet, but examples listed in XEP-136 seem to be processed correctly.

      Besides of that, there are two bugs solved in this patch:

      1) In current implementation values for 'save' attribute are either 'true' or 'false', but according to XEP-136, they have to be one of 'false', 'body', 'message', 'stream' - thus, 'true' was changed to 'body' ('message' and 'stream' seem to be rarely needed so their support is not that important).

      2) There were 2 errors in RSM implementation, which prevented it from functioning on significant number of examples. To verify the error and check the fix you can use RSM examples similar to those listed in XEP-136: before the fix, for "after" request the implementation gave incorrect results and "before" one just resulted in internal error, the fix seem to solve all issues I was able to spot (tested on collections listing, collections retrieval and "modified" requests).

      Patch is for https://svn.process-one.net/ejabberd-modules/mod_archive/trunk/src/mod_archive.erl

      Index: mod_archive.erl
      ===================================================================
      — mod_archive.erl (revision 165)
      +++ mod_archive.erl (working copy)
      @@ -71,6 +71,14 @@
      message_list = [],
      subject = ""}).

      +-record(archive_changes,
      +

      {usjs, + us, + jid, + start, + change_type, + change_time}

      ).
      +
      -record(msg,

      {direction, secs, body}

      ).

      %%====================================================================
      @@ -121,6 +129,9 @@
      mnesia:create_table(archive_message,
      [

      {disc_copies, [node()]},
      {attributes, record_info(fields, archive_message)}]),
      + mnesia:create_table(archive_changes,
      + [{disc_copies, [node()]}

      ,
      +

      {attributes, record_info(fields, archive_changes)}

      ]),
      % mnesia:add_table_index(archive_options, us),
      mnesia:add_table_index(archive_message, us),
      ejabberd_hooks:add(remove_user, Host, ?MODULE, remove_user, 50),
      @@ -243,6 +254,7 @@
      "retrieve" -> process_local_iq_retrieve(From, To, IQ);
      "save" -> process_local_iq_store(From, To, IQ);
      "remove" -> process_local_iq_remove(From, To, IQ);
      + "modified" -> process_local_iq_modified(From, To, IQ);
      _ -> IQ#iq

      {type = error, sub_el = [SubEl, ?ERR_FEATURE_NOT_IMPLEMENTED]}

      end
      @@ -256,6 +268,11 @@
      F = fun() ->
      lists:foreach(
      fun(R) ->
      + CL = #archive_changes

      {usjs = R#archive_message.usjs, + us = R#archive_message.us, + jid = R#archive_message.jid, + start = R#archive_message.start}

      ,
      + mnesia:delete_object(CL),
      mnesia:delete_object(R)
      end,
      mnesia:index_read(archive_message, US, #archive_message.us)),
      @@ -323,7 +340,14 @@
      message_list = [Message]};
      [E] -> E#archive_message

      {message_list=lists:append(E#archive_message.message_list, [Message])}

      end,

      • mnesia:write(NE)
        + mnesia:write(NE),
        + CL = #archive_changes {usjs = NE#archive_message.usjs, + us = NE#archive_message.us, + jid = NE#archive_message.jid, + start = NE#archive_message.start, + change_type = changed, + change_time = Tm}

        ,
        + mnesia:write(CL)
        end),
        NStorages.

      @@ -419,7 +443,11 @@
      AutoSave -> "true";
      true -> "false"
      end,

      • [
        Unknown macro: {xmlelement, "default", [{"save", "false"}, {"otr", "forbid"}], []},
        + SaveMode = if
        + AutoSave -> "body";
        + true -> "false"
        + end,
        + [{xmlelement, "default", [{"save", SaveMode}, {"otr", "forbid"}], []}

        ,

        Unknown macro: {xmlelement, "auto", [{"save", SaveAttr}], []}

        ].

      @@ -452,7 +480,7 @@
      % parse the list of xml element, and modify the given archive_option
      transaction_parse_save_elem(Options, [

      {xmlelement, "default", Attrs, _}

      | Tail]) ->
      V = case xml:get_attr_s("save", Attrs) of

      • "true" -> true;
        + "body" -> true;
        "false" -> false;
        _ -> unset
        end,
        @@ -596,7 +624,16 @@
        case parse_store_element (LUser, LServer, SubEl) of
        {error, E} -> IQ#iq{type = error, sub_el = [SubEl, E]};
        Collection ->
        - case mnesia:transaction(fun() -> mnesia:write(Collection) end) of
        + case mnesia:transaction(fun() ->
        + mnesia:write(Collection),
        + CL = #archive_changes{usjs = Collection#archive_message.us, + us = Collection#archive_message.us, + jid = Collection#archive_message.jid, + start = Collection#archive_message.start, + change_type = changed, + change_time = get_timestamp()},
        + mnesia:write(CL)
        + end) of
        {atomic, _} ->
        IQ#iq{type = result, sub_el=[]};
        _ ->
        @@ -778,7 +815,16 @@
        end.

        process_remove_index(Index) ->
        - case mnesia:transaction(fun() -> mnesia:delete({archive_message, Index}) end) of
        + case mnesia:transaction(fun() ->
        + {LUser, LServer, Jid, Start} = Index,
        + CL = #archive_changes{usjs = Index,
        + us = {LUser, LServer},
        + jid = Jid,
        + start = Start,
        + change_type = removed,
        + change_time = get_timestamp()},
        + mnesia:write(CL),
        + mnesia:delete({archive_message, Index}) end) of
        {atomic, _} ->
        ok;
        {aborted, _} ->
        @@ -791,7 +837,16 @@
        start='$1', message_list='', subject = ''},
        Guard = [{'>=', '$1', Start},{'<', '$1', End}],

        - lists:foreach(fun(R) -> mnesia:delete_object(R) end,
        + lists:foreach(fun(R) ->
        + CL = #archive_changes{usjs = R#archive_message.usjs, + us = R#archive_message.us, + jid = R#archive_message.jid, + start = R#archive_message.start, + change_type = removed, + change_time = get_timestamp()},
        + mnesia:write(CL),
        + mnesia:delete_object(R)
        + end,
        mnesia:select(archive_message, [{Pat, Guard, ['$_']}]))
        end,

        @@ -819,6 +874,24 @@
        {error, ?ERRT_INTERNAL_SERVER_ERROR("", "plop")}
        end.

        +% return {ok, [{#archive_message}]} or {error, xmlelement}
        +% With is a tuple Jid, or '_'
        +get_modified(LUser, LServer, Start, End, With) ->
        + case mnesia:transaction(fun() ->
        + Pat = #archive_changes{usjs= '_',
        + us = {LUser, LServer},
        + jid= With,
        + start='$1',
        + change_type='_',
        + change_time='_'},
        + Guard = [{'>=', '$1', Start},{'<', '$1', End}]
        ,
        + mnesia:select(archive_changes, [{Pat, Guard, ['$_']}])
        + end) of
        + {atomic, Result} ->
        + {ok, Result};
        + {aborted, _} ->
        + {error, ?ERRT_INTERNAL_SERVER_ERROR("", "plop")}
        + end.

        % Index is {LUser, LServer, With, Start}
        get_collection(Index) ->
        @@ -832,7 +905,67 @@



        +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
        +%% 10. Replication
        +%%

        +
        +process_local_iq_modified(From, _To, #iq{type = Type, sub_el = SubEl} = IQ) ->
        + #jid{luser = LUser, lserver = LServer} = From,
        + case Type of
        + set ->
        + IQ#iq{type = error, sub_el = [SubEl, ?ERR_NOT_ALLOWED]};
        + get ->
        + {xmlelement, _, Attrs, SubEls} = SubEl,
        + RSM = parse_rsm(SubEls),
        + ?MYDEBUG("RSM Results: ~p ~n", [RSM]),
        + Result = case parse_root_argument(Attrs) of
        + {error, E}

        ->

        {error, E}

        ;
        +

        {interval, Start, Stop}

        ->
        + get_modified(LUser, LServer, Start, Stop, '_');
        +

        {interval, Start, Stop, Jid}

        ->
        + get_modified(LUser, LServer, Start, Stop, Jid);
        +

        {index, Jid, Start}

        ->
        + get_modified(LUser, LServer, Start, infinity, Jid);
        + _ ->

        {error, ?ERR_BAD_REQUEST}

        + end,
        + case Result of
        +

        {ok, Items}

        ->
        + Zero = calendar:datetime_to_gregorian_seconds(1970, 1, 1}, {0, 0, 0),
        + FunId = fun(El) ->
        + Secs = element(7,El) - Zero,
        + jlib:now_to_utc_string(

        {Secs div 1000000, Secs rem 1000000, 0}

        )
        + end,
        + FunCompare = fun(Id, El) ->
        +

        {MegaSecs1, Secs1, _}

        = jlib:datetime_string_to_timestamp(Id),
        + Id1 = MegaSecs1 * 1000000 + Secs1 - Zero,
        +

        {MegaSecs2, Secs2, _}

        = jlib:datetime_string_to_timestamp(FunId(El)),
        + Id2 = MegaSecs2 * 1000000 + Secs2 - Zero,
        + if Id1 == Id2 -> equal;
        + Id1 > Id2 -> greater;
        + Id1 < Id2 -> smaller
        + end
        + end,
        + case catch execute_rsm(RSM, lists:keysort(7, Items), FunId,FunCompare) of
        +

        {error, R} ->
        + IQ#iq{type = error, sub_el = [SubEl, R]};
        + {'EXIT', Errr} ->
        + ?MYDEBUG("INTERNAL ERROR ~p ~n", [Errr]),
        + IQ#iq{type = error, sub_el = [SubEl, ?ERR_INTERNAL_SERVER_ERROR]};
        + {RSM_Elem, Items2} ->
        + Fun = fun(A) ->
        + Seconds= A#archive_changes.start - Zero,
        + Start2 = jlib:now_to_utc_string({Seconds div 1000000, Seconds rem 1000000, 0}),
        + Args = [{"with", jlib:jid_to_string(A#archive_changes.jid)}, {"start", Start2}],
        + {xmlelement, atom_to_list(A#archive_changes.change_type), Args, []}
        + end,
        + IQ#iq{type = result, sub_el = [{xmlelement, "modified", [{"xmlns", ?NS_ARCHIVE}], lists:append(lists:map(Fun, Items2),[RSM_Elem])}]}
        + end;
        + {error, R}

        ->
        + IQ#iq

        {type = error, sub_el = [SubEl, R]}

        + end
        + end.
        +
        %%%%%%%%%%%%%%%%%%%%%%%%%%
        % Utility

      @@ -1029,7 +1162,7 @@
      {index,
      case S of

      {id, IdentIndex}

      ->

      • integer_to_list(length(List) - list_to_integer(IdentIndex));
        + {id, integer_to_list(length(List) - list_to_integer(IdentIndex))}

        ;
        _ -> S
        end};
        _ ->
        @@ -1052,9 +1185,9 @@
        execute_rsm_aux({{id,I}, M, normal} = RSM, [E | Tail], IdFun, Acc) ->
        case IdFun(I, E) of
        smaller ->

      • execute_rsm_aux(RSM, Tail, IdFun, Acc + 1);
        + execute_rsm_aux( {0, M, normal}, [E | Tail], IdFun, Acc);
        _ ->
        - execute_rsm_aux({0, M, normal}

        , [E | Tail], IdFun, Acc)
        + execute_rsm_aux(RSM, Tail, IdFun, Acc + 1)
        end;

      execute_rsm_aux({{id,_}, _, normal}, [], _, Acc) ->

        Activity

        ndl Alexander Tsvyashchenko created issue -
        mremond@process-one.net Mickaël Rémond made changes -
        Field Original Value New Value
        Status Open [ 1 ] Review [ 10009 ]
        mremond@process-one.net Mickaël Rémond made changes -
        Workflow development v2 [ 13830 ] development v3 [ 13919 ]
        mremond@process-one.net Mickaël Rémond made changes -
        Status Review [ 10009 ] Open [ 1 ]
        ndl Alexander Tsvyashchenko made changes -
        Status Open [ 1 ] Resolved [ 5 ]
        Resolution Won't Fix [ 2 ]
        cromain@process-one.net Christophe Romain (Inactive) made changes -
        Status Resolved [ 5 ] Closed [ 6 ]
        mremond@process-one.net Mickaël Rémond made changes -
        Workflow development v3 [ 13919 ] Development v4 [ 80614 ]

          People

          • Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development