Uploaded image for project: 'ejabberd development'
  1. ejabberd development
  2. EJAB-280

mod_archive replication support + bugfixes

    Details

    • Type: Improvement
    • Status: Closed
    • Priority: Minor
    • Resolution: Won't Fix
    • Affects Version/s: ejabberd 1.1.3
    • Fix Version/s: None
    • Component/s: XMPP Compliance
    • Labels:
      None

      Description

      According to XEP-136, server implementation must allow the client to replicate archive, i.e. get all changes in archive stored on server side since last synchronization. This feature is very important for proper client support implementation, as it allows faster operations (using locally cached archive) as well as searches through the archive.

      Here is the patch to mod_archive that adds preliminary support for this feature. I haven't tested all possible scenarios yet, but examples listed in XEP-136 seem to be processed correctly.

      Besides of that, there are two bugs solved in this patch:

      1) In current implementation values for 'save' attribute are either 'true' or 'false', but according to XEP-136, they have to be one of 'false', 'body', 'message', 'stream' - thus, 'true' was changed to 'body' ('message' and 'stream' seem to be rarely needed so their support is not that important).

      2) There were 2 errors in RSM implementation, which prevented it from functioning on significant number of examples. To verify the error and check the fix you can use RSM examples similar to those listed in XEP-136: before the fix, for "after" request the implementation gave incorrect results and "before" one just resulted in internal error, the fix seem to solve all issues I was able to spot (tested on collections listing, collections retrieval and "modified" requests).

      Patch is for https://svn.process-one.net/ejabberd-modules/mod_archive/trunk/src/mod_archive.erl

      Index: mod_archive.erl
      ===================================================================
      — mod_archive.erl (revision 165)
      +++ mod_archive.erl (working copy)
      @@ -71,6 +71,14 @@
      message_list = [],
      subject = ""}).

      +-record(archive_changes,
      +

      {usjs, + us, + jid, + start, + change_type, + change_time}

      ).
      +
      -record(msg,

      {direction, secs, body}

      ).

      %%====================================================================
      @@ -121,6 +129,9 @@
      mnesia:create_table(archive_message,
      [

      {disc_copies, [node()]},
      {attributes, record_info(fields, archive_message)}]),
      + mnesia:create_table(archive_changes,
      + [{disc_copies, [node()]}

      ,
      +

      {attributes, record_info(fields, archive_changes)}

      ]),
      % mnesia:add_table_index(archive_options, us),
      mnesia:add_table_index(archive_message, us),
      ejabberd_hooks:add(remove_user, Host, ?MODULE, remove_user, 50),
      @@ -243,6 +254,7 @@
      "retrieve" -> process_local_iq_retrieve(From, To, IQ);
      "save" -> process_local_iq_store(From, To, IQ);
      "remove" -> process_local_iq_remove(From, To, IQ);
      + "modified" -> process_local_iq_modified(From, To, IQ);
      _ -> IQ#iq

      {type = error, sub_el = [SubEl, ?ERR_FEATURE_NOT_IMPLEMENTED]}

      end
      @@ -256,6 +268,11 @@
      F = fun() ->
      lists:foreach(
      fun(R) ->
      + CL = #archive_changes

      {usjs = R#archive_message.usjs, + us = R#archive_message.us, + jid = R#archive_message.jid, + start = R#archive_message.start}

      ,
      + mnesia:delete_object(CL),
      mnesia:delete_object(R)
      end,
      mnesia:index_read(archive_message, US, #archive_message.us)),
      @@ -323,7 +340,14 @@
      message_list = [Message]};
      [E] -> E#archive_message

      {message_list=lists:append(E#archive_message.message_list, [Message])}

      end,

      • mnesia:write(NE)
        + mnesia:write(NE),
        + CL = #archive_changes {usjs = NE#archive_message.usjs, + us = NE#archive_message.us, + jid = NE#archive_message.jid, + start = NE#archive_message.start, + change_type = changed, + change_time = Tm}

        ,
        + mnesia:write(CL)
        end),
        NStorages.

      @@ -419,7 +443,11 @@
      AutoSave -> "true";
      true -> "false"
      end,

      • [
        Unknown macro: {xmlelement, "default", [{"save", "false"}, {"otr", "forbid"}], []},
        + SaveMode = if
        + AutoSave -> "body";
        + true -> "false"
        + end,
        + [{xmlelement, "default", [{"save", SaveMode}, {"otr", "forbid"}], []}

        ,

        Unknown macro: {xmlelement, "auto", [{"save", SaveAttr}], []}

        ].

      @@ -452,7 +480,7 @@
      % parse the list of xml element, and modify the given archive_option
      transaction_parse_save_elem(Options, [

      {xmlelement, "default", Attrs, _}

      | Tail]) ->
      V = case xml:get_attr_s("save", Attrs) of

      • "true" -> true;
        + "body" -> true;
        "false" -> false;
        _ -> unset
        end,
        @@ -596,7 +624,16 @@
        case parse_store_element (LUser, LServer, SubEl) of
        {error, E} -> IQ#iq{type = error, sub_el = [SubEl, E]};
        Collection ->
        - case mnesia:transaction(fun() -> mnesia:write(Collection) end) of
        + case mnesia:transaction(fun() ->
        + mnesia:write(Collection),
        + CL = #archive_changes{usjs = Collection#archive_message.us, + us = Collection#archive_message.us, + jid = Collection#archive_message.jid, + start = Collection#archive_message.start, + change_type = changed, + change_time = get_timestamp()},
        + mnesia:write(CL)
        + end) of
        {atomic, _} ->
        IQ#iq{type = result, sub_el=[]};
        _ ->
        @@ -778,7 +815,16 @@
        end.

        process_remove_index(Index) ->
        - case mnesia:transaction(fun() -> mnesia:delete({archive_message, Index}) end) of
        + case mnesia:transaction(fun() ->
        + {LUser, LServer, Jid, Start} = Index,
        + CL = #archive_changes{usjs = Index,
        + us = {LUser, LServer},
        + jid = Jid,
        + start = Start,
        + change_type = removed,
        + change_time = get_timestamp()},
        + mnesia:write(CL),
        + mnesia:delete({archive_message, Index}) end) of
        {atomic, _} ->
        ok;
        {aborted, _} ->
        @@ -791,7 +837,16 @@
        start='$1', message_list='', subject = ''},
        Guard = [{'>=', '$1', Start},{'<', '$1', End}],

        - lists:foreach(fun(R) -> mnesia:delete_object(R) end,
        + lists:foreach(fun(R) ->
        + CL = #archive_changes{usjs = R#archive_message.usjs, + us = R#archive_message.us, + jid = R#archive_message.jid, + start = R#archive_message.start, + change_type = removed, + change_time = get_timestamp()},
        + mnesia:write(CL),
        + mnesia:delete_object(R)
        + end,
        mnesia:select(archive_message, [{Pat, Guard, ['$_']}]))
        end,

        @@ -819,6 +874,24 @@
        {error, ?ERRT_INTERNAL_SERVER_ERROR("", "plop")}
        end.

        +% return {ok, [{#archive_message}]} or {error, xmlelement}
        +% With is a tuple Jid, or '_'
        +get_modified(LUser, LServer, Start, End, With) ->
        + case mnesia:transaction(fun() ->
        + Pat = #archive_changes{usjs= '_',
        + us = {LUser, LServer},
        + jid= With,
        + start='$1',
        + change_type='_',
        + change_time='_'},
        + Guard = [{'>=', '$1', Start},{'<', '$1', End}]
        ,
        + mnesia:select(archive_changes, [{Pat, Guard, ['$_']}])
        + end) of
        + {atomic, Result} ->
        + {ok, Result};
        + {aborted, _} ->
        + {error, ?ERRT_INTERNAL_SERVER_ERROR("", "plop")}
        + end.

        % Index is {LUser, LServer, With, Start}
        get_collection(Index) ->
        @@ -832,7 +905,67 @@



        +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
        +%% 10. Replication
        +%%

        +
        +process_local_iq_modified(From, _To, #iq{type = Type, sub_el = SubEl} = IQ) ->
        + #jid{luser = LUser, lserver = LServer} = From,
        + case Type of
        + set ->
        + IQ#iq{type = error, sub_el = [SubEl, ?ERR_NOT_ALLOWED]};
        + get ->
        + {xmlelement, _, Attrs, SubEls} = SubEl,
        + RSM = parse_rsm(SubEls),
        + ?MYDEBUG("RSM Results: ~p ~n", [RSM]),
        + Result = case parse_root_argument(Attrs) of
        + {error, E}

        ->

        {error, E}

        ;
        +

        {interval, Start, Stop}

        ->
        + get_modified(LUser, LServer, Start, Stop, '_');
        +

        {interval, Start, Stop, Jid}

        ->
        + get_modified(LUser, LServer, Start, Stop, Jid);
        +

        {index, Jid, Start}

        ->
        + get_modified(LUser, LServer, Start, infinity, Jid);
        + _ ->

        {error, ?ERR_BAD_REQUEST}

        + end,
        + case Result of
        +

        {ok, Items}

        ->
        + Zero = calendar:datetime_to_gregorian_seconds(1970, 1, 1}, {0, 0, 0),
        + FunId = fun(El) ->
        + Secs = element(7,El) - Zero,
        + jlib:now_to_utc_string(

        {Secs div 1000000, Secs rem 1000000, 0}

        )
        + end,
        + FunCompare = fun(Id, El) ->
        +

        {MegaSecs1, Secs1, _}

        = jlib:datetime_string_to_timestamp(Id),
        + Id1 = MegaSecs1 * 1000000 + Secs1 - Zero,
        +

        {MegaSecs2, Secs2, _}

        = jlib:datetime_string_to_timestamp(FunId(El)),
        + Id2 = MegaSecs2 * 1000000 + Secs2 - Zero,
        + if Id1 == Id2 -> equal;
        + Id1 > Id2 -> greater;
        + Id1 < Id2 -> smaller
        + end
        + end,
        + case catch execute_rsm(RSM, lists:keysort(7, Items), FunId,FunCompare) of
        +

        {error, R} ->
        + IQ#iq{type = error, sub_el = [SubEl, R]};
        + {'EXIT', Errr} ->
        + ?MYDEBUG("INTERNAL ERROR ~p ~n", [Errr]),
        + IQ#iq{type = error, sub_el = [SubEl, ?ERR_INTERNAL_SERVER_ERROR]};
        + {RSM_Elem, Items2} ->
        + Fun = fun(A) ->
        + Seconds= A#archive_changes.start - Zero,
        + Start2 = jlib:now_to_utc_string({Seconds div 1000000, Seconds rem 1000000, 0}),
        + Args = [{"with", jlib:jid_to_string(A#archive_changes.jid)}, {"start", Start2}],
        + {xmlelement, atom_to_list(A#archive_changes.change_type), Args, []}
        + end,
        + IQ#iq{type = result, sub_el = [{xmlelement, "modified", [{"xmlns", ?NS_ARCHIVE}], lists:append(lists:map(Fun, Items2),[RSM_Elem])}]}
        + end;
        + {error, R}

        ->
        + IQ#iq

        {type = error, sub_el = [SubEl, R]}

        + end
        + end.
        +
        %%%%%%%%%%%%%%%%%%%%%%%%%%
        % Utility

      @@ -1029,7 +1162,7 @@
      {index,
      case S of

      {id, IdentIndex}

      ->

      • integer_to_list(length(List) - list_to_integer(IdentIndex));
        + {id, integer_to_list(length(List) - list_to_integer(IdentIndex))}

        ;
        _ -> S
        end};
        _ ->
        @@ -1052,9 +1185,9 @@
        execute_rsm_aux({{id,I}, M, normal} = RSM, [E | Tail], IdFun, Acc) ->
        case IdFun(I, E) of
        smaller ->

      • execute_rsm_aux(RSM, Tail, IdFun, Acc + 1);
        + execute_rsm_aux( {0, M, normal}, [E | Tail], IdFun, Acc);
        _ ->
        - execute_rsm_aux({0, M, normal}

        , [E | Tail], IdFun, Acc)
        + execute_rsm_aux(RSM, Tail, IdFun, Acc + 1)
        end;

      execute_rsm_aux({{id,_}, _, normal}, [], _, Acc) ->

        Activity

        No work has yet been logged on this issue.

          People

          • Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development