Hi there everyone, I have a problem with sending and receiving TCP bytes. Clients make a request to the server and the server bundles the request as well as the query results (this is a stream converted into bytes) of the request into a buffer for sending to the client. On the client side, the client receives the response and then splits the received buffer into 2: one buffer of fixed length for the protocol request (this is just a packed record) and the other buffer will contain the stream with the request results. However, the size of the data received on the client side rarely matches the data sent by the server. I use {code} ShowMessage(Format('Protocol buffer length %d; Collection buffer length %d; Merged buffer length %d', [Length(LProtocolBuffer), Length(LCollectionBuffer), Length(LBuffer)])); {code} to compare what is sent to what is received. An abridged version of the server side code is shown below. I used a code snippet that Remy used on this web page to concatenate the two buffers into one buffer for sending to the client: [http://stackoverflow.com/questions/7520068/best-way-to-combine-multiple-tbytes-arrays] {code} procedure TfrmServeur.TCPServerExecute(AContext: TIdContext); var // temporary buffer LProtocolBuffer, LCollectionBuffer, LBuffer: TBytes; // data size in InputBuffer LDataSize: Integer; // protocol structure LProtocol: TProtocol; // we need to HARD CAST AContext to TClientContext // in order to access our custom methods(procedures) LClientContext: TClientContext; // LStream: TMemoryStream; begin // clear the protocol structure InitProtocol(LProtocol); // Bu default, we are not updating a record bUpdateRecord := False; // Hard cast AContext to TClientContext LClientContext := TClientContext(AContext); // if AContext.Connection.IOHandler.InputBufferIsEmpty then begin {$IFDEF CONN_THREAD} // if no data is received within 2 minutes, disconnect the client // because the client is offline if MinutesBetween(Now, TMyData(LClientContext.Data).LastRecvTime) >= 2 then begin LClientContext.Connection.Disconnect(True); // Remove client from listview Exit; end; {$ENDIF} end else if not AContext.Connection.IOHandler.InputBufferIsEmpty then begin // store the size of the InputBuffer of the client LDataSize := LClientContext.Connection.IOHandler.InputBuffer.Size; // in order to prevent spams or to make sure that we have at least // the protocol structure sent we check the size of the InputBuffer if LDataSize >= szProtocol then begin // LStream := TMemoryStream.Create; // try try // Obtient une connexion dans le pool des connexions // et prepare les objets pour les requêtes dm := AcquireDM; qryRead := dm.qryRead; qryWrite := dm.qryWrite; qryRead.SQL.Clear; qryWrite.SQL.Clear; except // Sort de la loop car une connexion n'a pas été obtenu // Reprendre l'execution après << finally >> en bas Abort; end; // try // read the protocol structure from the client so we can handle // the client's request LClientContext.Connection.IOHandler.ReadBytes(LBuffer, szProtocol); // convert the buffer to protocol structure LProtocol := BytesToProtocol(LBuffer); // check client command and act accordingly case LProtocol.Command of {$IFDEF CONN_THREAD} cmdKeepAlive: begin // client is still online so the ping was successful, // reset last received time to 'Now' TMyData(LClientContext.Data).LastRecvTime := Now; // Update listview column - last received time with TUpdateUIClientRecvTime.Create do begin ConnectionTime := TMyData(LClientContext.Data).ConnectionTime; LastRecvTime := TMyData(LClientContext.Data).LastRecvTime; Notify; end; // with TUpdateUIClientRecvTime.Create do end; // cmdKeepAlive: begin {$ENDIF} //************************************************// // // Les listes << Ex >> & les rapports simple // //************************************************// cmdListeVillesEx, cmdListeBeneficiairesEx: begin // with qryRead do begin // LGenerics := TGenerics.Create; // LGenerics.Clear; try SQL.Clear; SQL.Text := SQLQueryString(LProtocol); Open; // while not EOF do begin with LGenerics.Add do begin _Integer := Fields.Fields[0].AsInteger; _String:= Fields.Fields[1].AsWideString; end; Next; end; // close the dataset Close; // copy the collection to a memory stream in preparation for sending to clients SaveCollectionToStream(LGenerics, LStream); // // Source: https://groups.google.com/forum/#!topic/borland.public.delphi.objectpascal/aU7E0y3BYdo // LStream.Position := 0; // SetLength(LCollectionBuffer, LStream.Size); // copy the stream into the LCollectionBuffer LStream.Read(LCollectionBuffer[0], LStream.Size); // FreeAndNil(LGenerics); except if Active then Close; if Assigned(LGenerics) then FreeAndNil(LGenerics); end; // trye end; // with qryRead do end; // cmdListeVillesEx et al end; // case LProtocol.Command of except // Is this the reason why there are memory leaks on line 1440 & 1593 // //// Libere les objets crée auparavant //// Libere les << collections >> //if Assigned(LGenerics) then FreeAndNil(LGenerics); // Error if LGenerics does not exist //// Libère le stream //if Assigned(LStream) then FreeAndNil(LStream); end; finally // Convertir le protocol en bytes LProtocolBuffer := ProtocolToBytes(LProtocol); // merge the buffers into one buffer // Source: http://stackoverflow.com/questions/7520068/best-way-to-combine-multiple-tbytes-arrays SetLength(LBuffer, Length(LProtocolBuffer) + Length(LCollectionBuffer)); if LProtocolBuffer <> nil then Move(LProtocolBuffer[0], LBuffer[0], Length(LProtocolBuffer)); if LCollectionBuffer <> nil then Move(LCollectionBuffer[0], LBuffer[Length(LProtocolBuffer)], Length(LCollectionBuffer)); //ShowMessage(Format('Protocol buffer length %d; Collection buffer length %d; Merged buffer length %d', [Length(LProtocolBuffer), Length(LCollectionBuffer), Length(LBuffer)])); // send the merged buffer to the client LClientContext.Connection.IOHandler.Write(LBuffer); // clear all the buffers ClearBuffer(LProtocolBuffer); ClearBuffer(LCollectionBuffer); ClearBuffer(LBuffer); // Libere LStream FreeAndNil(LStream); // Initialize(LProtocol); // Libère les composants de données qryRead := nil; qryWrite := nil; qryRead.Free; qryWrite.Free; // Remettre la connection dans le pool des connections ReleaseDM(dm); // Reinitialise les données du << LastRecvTime >> TMyData(LClientContext.Data).LastRecvTime := Now; // Update listview column - last received time with TUpdateUIClientRecvTime.Create do begin ConnectionTime := TMyData(LClientContext.Data).ConnectionTime; LastRecvTime := TMyData(LClientContext.Data).LastRecvTime; Notify; end; // with TUpdateUIClientRecvTime.Create do end; // tryf // end; // if LDataSize >= szProtocol then end; // if AContext.Connection.IOHandler.InputBufferIsEmpty then // Petit pause pour empecher trop de consumation de CPU par le serveur IndySleep(10); end; {code} An abridged version of the client side receiving thread is shown below: {code} // // The EXECUTE method for routing server replies // // OnError just cancel ALL operations & initialize the form // procedure TDataThread.Execute; function SliceByteArray(const B: array of Byte; Start, Len: Integer): TBytes; begin if Start < 0 then Start := 0; if Len < 0 then Len := 0 else if Start >= Length(B) then Len := 0 else if Start + Len > Length(B) then Len := Length(B) - Start; SetLength(Result, Len); if Len > 0 then Move(B[Start], Result[0], Len); end; var LBuffer, LDataBuffer, LProtocolBuffer: TBytes; LProtocol: TProtocol; LDataSize: int64; begin // FreeOnTerminate := not (IsConnectionThread = True); // // while NOT Terminated and ThreadTCPClient.Connected and (LoopCount > 0) and (ThreadError = EmptyStr) do begin // if IsConnectionThread then begin // Ping the server every minute // ONLY the connection thread is able to do this if (MinutesBetween(Now, FLastPingTime) >= 1) then begin // Lock; // initialize the protocol InitProtocol(LProtocol); // set the command to cmdKeepAlive LProtocol.Command := cmdKeepAlive; // Convertir le protocol en bytes LBuffer := ProtocolToBytes(LProtocol); // try try // Envoyez la requête en bytes au serveur ThreadTCPClient.IOHandler.Write(LBuffer); // reset the last ping time to Now LastPingTime := Now; // The synchonize call below can be used as a visual indicator // for users to know if they are still online //Synchronize(@Self.DoConnectionAlive); except // Exception handling code here ! On E: EIdException do begin raise EAbort.Create(E.Message); end; end; finally // Unlock; // clear buffer ClearBuffer(LBuffer); end; end; // if (MinutesBetween(Now, FLastPingTime) >= 1) then end else if not IsConnectionThread then begin if not ThreadTCPClient.IOHandler.InputBufferIsEmpty and (ThreadTCPClient.IOHandler.InputBuffer.Size >= szProtocol) then begin // Lock; // try // try // Initialize the protocol InitProtocol(LProtocol); // // // then read from InputBuffer the size of the protocol structure ThreadTCPClient.IOHandler.InputBuffer.ExtractToBytes(LBuffer); // move a block of memory from the beginning of LBuffer to the location of the size of the protocol structure // LProtocolBuffer SetLength(LProtocolBuffer, szProtocol); LProtocolBuffer := SliceByteArray(LBuffer, 0, szProtocol); // convert array of bytes to protocol LProtocol := BytesToProtocol(LProtocolBuffer); // move a block of memory from the location after the size of the protocol structure // to the end of LBuffer to LCollectionBuffer SetLength(LCollectionBuffer, Length(LBuffer) - szProtocol); LDataBuffer := SliceByteArray(LBuffer, szProtocol, Length(LBuffer) - szProtocol); // The following line is just for debugging. Remove it when all is well ShowMessage(Format('Protocol buffer length %d; Collection buffer length %d; Merged buffer length %d', [Length(LProtocolBuffer), Length(LDataBuffer), Length(LBuffer)])); // LoopCount := LoopCount - 1; // case LProtocol.Command of cmdListeVillesEx: begin // ControlName := 'Villes'; // Transferez le flux au thread principale Synchronize(@DoListes); end; cmdListeBeneficiairesEx: begin // ControlName := 'Bénéficiaires'; // Transferez le flux au thread principale Synchronize(@DoListes); end; end; // case LProtocol.Command of except On E: EIdSocketError do begin //ThreadError := strProblemeDeConnection; raise EAbort.Create(strProblemeDeConnection) end; On E: EIdConnClosedGracefully do begin //ThreadError := strConnectionFermeParServeur; raise EAbort.Create(strProblemeDeConnection) end; // On E: Exception do begin raise EAbort.Create(E.Message) end; end; // try...except finally // Unlock; // clear buffer ClearBuffer(LBuffer); ClearBuffer(LDataBuffer); ClearBuffer(LProtocolBuffer); // ControlName := ''; end; // try...finally end; // if not ThreadTCPClient.IOHandler.InputBufferIsEmpty then end; // Sleep(10); end; // while NOT Terminated and ThreadTCPClient.Connected do end; {code} Thanks for your kind assistance. Jay Dee
![]() |
0 |
![]() |
Jay wrote: > However, the size of the data received on the client side rarely > matches the data sent by the server. That is because you have logic errors in your code on both sides. You are not handling the reading/writing of your protocol packets correctly, especially in regard to the fact that TCP is a streaming protocol, so there is guarantee that you would have a full protocol packet in the InputBuffer at any given time. Particularly on the client side, because your client has no idea how many bytes the server is actually sending, especially for the collection data. Try something more like this: {code} procedure TfrmServeur.TCPServerConnect(AContext: TIdContext); begin {$IFDEF CONN_THREAD} AContext.Connection.IOHandler.ReadTimeout := 120000; {$ENDIF} end; procedure TfrmServeur.TCPServerExecute(AContext: TIdContext); type PBytes = ^TBytes; var // temporary buffer LProtocolBuffer, LCollectionBuffer, LBuffer: TIdBytes; // protocol structure LProtocol: TProtocol; // LStream: TMemoryStream; dm: TMyDataModule; // replace this with your actual DM class type begin // clear the protocol structure InitProtocol(LProtocol); // Bu default, we are not updating a record bUpdateRecord := False; // // read the protocol structure from the client so we can handle // the client's request AContext.Connection.IOHandler.ReadBytes(LBuffer, szProtocol); // convert the buffer to protocol structure LProtocol := BytesToProtocol(PBytes(@LBuffer)^); // check client command and act accordingly case LProtocol.Command of {$IFDEF CONN_THREAD} cmdKeepAlive: begin // client is still online so the ping was successful end; // cmdKeepAlive: begin {$ENDIF} //************************************************// // // Les listes << Ex >> & les rapports simple // //************************************************// cmdListeVillesEx, cmdListeBeneficiairesEx: begin // try // Obtient une connexion dans le pool des connexions // et prepare les objets pour les requêtes dm := AcquireDM; try dm.qryRead.SQL.Clear; dm.qryWrite.SQL.Clear; except ReleaseDM(dm); raise; end; except // Sort de la loop car une connexion n'a pas été obtenu // Reprendre l'execution après << finally >> en bas Abort; end; // try with dm.qryRead do begin // LGenerics := TGenerics.Create; try // LGenerics.Clear; SQL.Text := SQLQueryString(LProtocol); Open; // try while not EOF do begin with LGenerics.Add do begin _Integer := Fields.Fields[0].AsInteger; _String:= Fields.Fields[1].AsWideString; end; Next; end; finally // close the dataset Close; end; // tryf // copy the collection to a memory stream in preparation for sending to clients LStream := TMemoryStream.Create; try SaveCollectionToStream(LGenerics, LStream); // // Source: https://groups.google.com/forum/#!topic/borland.public.delphi.objectpascal/aU7E0y3BYdo // LStream.Position := 0; // copy the stream into the LCollectionBuffer ReadTIdBytesFromStream(LStream, LCollectionBuffer, LStream.Size); finally // Libere LStream LStream.Free; end; finally LGenerics.Free; end; // tryf end; // with dm.qryRead do finally // Remettre la connection dans le pool des connections ReleaseDM(dm); end; // tryf end; // cmdListeVillesEx et al end; // case LProtocol.Command of // Convertir le protocol en bytes PBytes(@LProtocolBuffer)^ := ProtocolToBytes(LProtocol); //MessageBox(0, PChar(Format('Protocol buffer length %d; Collection buffer length %d', [Length(LProtocolBuffer), Length(LCollectionBuffer)])), '', MB_OK); // send the buffers to the client LClientContext.Connection.IOHandler.Write(Length(LProtocolBuffer)+Length(LCollectionBuffer)); LClientContext.Connection.IOHandler.Write(LProtocolBuffer); LClientContext.Connection.IOHandler.Write(LCollectionBuffer); // Reinitialise les données du << LastRecvTime >> TMyData(AContext.Data).LastRecvTime := Now; // Update listview column - last received time with TUpdateUIClientRecvTime.Create do begin ConnectionTime := TMyData(AContext.Data).ConnectionTime; LastRecvTime := TMyData(AContext.Data).LastRecvTime; Notify; end; // with TUpdateUIClientRecvTime.Create do end; {code} {code} // // The EXECUTE method for routing server replies // // OnError just cancel ALL operations & initialize the form // procedure TDataThread.Execute; type PBytes = ^TBytes; var LBuffer, LDataBuffer, LProtocolBuffer: TIdBytes; LProtocol: TProtocol; LDataSize: Integer; begin // FreeOnTerminate := not (IsConnectionThread = True); // // while (NOT Terminated) and ThreadTCPClient.Connected and (LoopCount > 0) and (ThreadError = EmptyStr) do begin // if IsConnectionThread then begin // Ping the server every minute // ONLY the connection thread is able to do this if (MinutesBetween(Now, FLastPingTime) >= 1) then begin // Lock; try // initialize the protocol InitProtocol(LProtocol); // set the command to cmdKeepAlive LProtocol.Command := cmdKeepAlive; // Convertir le protocol en bytes PBytes(@LBuffer)^ := ProtocolToBytes(LProtocol); // try // Envoyez la requête en bytes au serveur ThreadTCPClient.IOHandler.Write(LBuffer); // reset the last ping time to Now LastPingTime := Now; // The synchonize call below can be used as a visual indicator // for users to know if they are still online //Synchronize(@Self.DoConnectionAlive); except // Exception handling code here ! On E: EIdException do begin raise EAbort.Create(E.Message); end; end; finally // Unlock; // SetLength(LBuffer, 0); end; end; // if (MinutesBetween(Now, FLastPingTime) >= 1) then end; if not ThreadTCPClient.IOHandler.InputBufferIsEmpty then begin // Lock; try // try // Initialize the protocol InitProtocol(LProtocol); // // then read from InputBuffer the size of the protocol structure LDataSize := ThreadTCPClient.IOHandler.InputBuffer.ReadLongint; ThreadTCPClient.IOHandler.InputBuffer.ReadBytes(LBuffer, LDataSize); // // move a block of memory from the beginning of LBuffer to the location of the size of the protocol structure // LProtocolBuffer LProtocolBuffer := Copy(LBuffer, 0, szProtocol); // move a block of memory from the location after the size of the protocol structure // to the end of LBuffer to LCollectionBuffer LDataBuffer := Copy(LBuffer, szProtocol, LDataSize - szProtocol); // The following line is just for debugging. Remove it when all is well MessageBox(0, PChar(Format('Protocol buffer length %d; Collection buffer length %d; Merged buffer length %d', [Length(LProtocolBuffer), Length(LDataBuffer), Length(LBuffer)]))), '', MB_OK); // Dec(LoopCount); // // convert array of bytes to protocol LProtocol := BytesToProtocol(PBytes(@LProtocolBuffer)^); case LProtocol.Command of cmdListeVillesEx: begin // ControlName := 'Villes'; // Transferez le flux au thread principale Synchronize(@DoListes); end; cmdListeBeneficiairesEx: begin // ControlName := 'Bénéficiaires'; // Transferez le flux au thread principale Synchronize(@DoListes); end; end; // case LProtocol.Command of except On E: EIdSocketError do begin //ThreadError := strProblemeDeConnection; raise EAbort.Create(strProblemeDeConnection) end; On E: EIdConnClosedGracefully do begin //ThreadError := strConnectionFermeParServeur; raise EAbort.Create(strProblemeDeConnection) end; // On E: Exception do begin raise EAbort.Create(E.Message) end; end; // try...except finally // Unlock; // SetLength(LBuffer, 0); SetLength(LProtocolBuffer, 0); SetLength(LDataBuffer, 0); // ControlName := ''; end; // try...finally end; // if not ThreadTCPClient.IOHandler.InputBufferIsEmpty then end; // while NOT Terminated and ThreadTCPClient.Connected do end; {code} -- Remy Lebeau (TeamB)
![]() |
0 |
![]() |
Thanks a lot Remy for your aid & the clarity of your answers. You suggestion worked. Indy has sooo many functions & procedures. I did not know about ReadTIdBytesFromStream & its counterpart WriteTIdBytesToStream that I now use on the client side. I would like your input concerning some code I use to move collections to and from streams. Sometimes they work & sometimes they fail. I haven't been able to figure out why? Here it is {code} // // Persistent collection methods // Source: http://users.atw.hu/delphicikk/listaz.php?id=620&oldal=36 procedure LoadCollectionFromStream(Stream: TStream; Collection: TCollection); begin // with TReader.Create(Stream, 4096) do try CheckValue(vaCollection); ReadCollection(Collection); finally Free; end; end; procedure LoadCollectionFromFile(const FileName: string; Collection: TCollection); var FS: TFileStream; begin FS := TFileStream.Create(FileName, fmOpenRead or fmShareDenyWrite); try LoadCollectionFromStream(FS, Collection); finally FS.Free; end; end; procedure SaveCollectionToStream(Collection: TCollection; Stream: TStream); begin with TWriter.Create(Stream, 4096) do try WriteCollection(Collection); finally Free; end; end; procedure SaveCollectionToFile(Collection: TCollection; const FileName: string); var FS: TFileStream; begin FS := TFileStream.Create(FileName, fmCreate or fmShareDenyWrite); try SaveCollectionToStream(Collection, FS); finally FS.Free; end; end; {code} When the client side thread terminates, the main thread takes over & the procedure below loads each stream into an array of generic collection (FGenerics: array of TGenerics) which was previously created in the mainform's Create method. The generic collection is destroyed in the mainform's Destroy method {code} { TGeneric } TGeneric = class(TBaseItemClass) private { private declarations } FInteger: integer; FString: UTF8string; function GetPropertyCount: integer; //override; function GetPropertyString(Aindex: integer): UTF8string; //override; public { public declarations } published { published declarations } property _Integer: integer read FInteger write FInteger; property _String: UTF8string read FString write FString; end; procedure TfrmChangementSituation.Listes(AStream: TMemoryStream; AName: string); begin // try //ShowMessage('Stream size is ' + IntToStr(AStream.Size)); // if AStream.Size > 0 then begin case AName of 'Mediateur' : LoadCollectionFromStream(AStream, FGenerics[0]); 'Bénéficiaires' : LoadCollectionFromStream(AStream, FGenerics[1]); 'ResidentFoyer' : LoadCollectionFromStream(AStream, FGenerics[2]); 'Patients' : LoadCollectionFromStream(AStream, FGenerics[3]); 'TypeEvenement' : LoadCollectionFromStream(AStream, FGenerics[4]); end; // case AName of // Dec(FControlCount); // frmMain.UpdateProgressBar(1); // Fill the GUI controls using the collections // if the thread has been terminated if FControlCount = 0 then FillControls; end; // if AStream.Size > 0 then except On E: Exception do ShowMessage(E.Message); end; end; {code} This is where the show stops! Sometimes I get read errors inside of LoadCollectionFromStream, the progressbar freezes & the form remains empty! Is there a better way of doing this? Thanks a lot. Jay Dee > {quote:title=Jay Dee wrote:}{quote} > Hi there everyone, > > I have a problem with sending and receiving TCP bytes. Clients make a request to the server and the server bundles the request as well as the query results (this is a stream converted into bytes) of the request into a buffer for sending to the client. On the client side, the client receives the response and then splits the received buffer into 2: one buffer of fixed length for the protocol request (this is just a packed record) and the other buffer will contain the stream with the request results. > > However, the size of the data received on the client side rarely matches the data sent by the server. I use > {code} > ShowMessage(Format('Protocol buffer length %d; Collection buffer length %d; Merged buffer length %d', [Length(LProtocolBuffer), Length(LCollectionBuffer), Length(LBuffer)])); > {code} > to compare what is sent to what is received. > > An abridged version of the server side code is shown below. I used a code snippet that Remy used on this web page to concatenate the two buffers into one buffer for sending to the client: [http://stackoverflow.com/questions/7520068/best-way-to-combine-multiple-tbytes-arrays] > > {code} > procedure TfrmServeur.TCPServerExecute(AContext: TIdContext); > var > // temporary buffer > LProtocolBuffer, LCollectionBuffer, LBuffer: TBytes; > // data size in InputBuffer > LDataSize: Integer; > // protocol structure > LProtocol: TProtocol; > // we need to HARD CAST AContext to TClientContext > // in order to access our custom methods(procedures) > LClientContext: TClientContext; > // > LStream: TMemoryStream; > begin > // clear the protocol structure > InitProtocol(LProtocol); > // Bu default, we are not updating a record > bUpdateRecord := False; > // Hard cast AContext to TClientContext > LClientContext := TClientContext(AContext); > // > if AContext.Connection.IOHandler.InputBufferIsEmpty then > begin > {$IFDEF CONN_THREAD} > // if no data is received within 2 minutes, disconnect the client > // because the client is offline > if MinutesBetween(Now, TMyData(LClientContext.Data).LastRecvTime) >= 2 then > begin > LClientContext.Connection.Disconnect(True); // Remove client from listview > Exit; > end; > {$ENDIF} > end > else if not AContext.Connection.IOHandler.InputBufferIsEmpty then > begin > // store the size of the InputBuffer of the client > LDataSize := LClientContext.Connection.IOHandler.InputBuffer.Size; > // in order to prevent spams or to make sure that we have at least > // the protocol structure sent we check the size of the InputBuffer > if LDataSize >= szProtocol then > begin > // > LStream := TMemoryStream.Create; > // > try > try > // Obtient une connexion dans le pool des connexions > // et prepare les objets pour les requêtes > dm := AcquireDM; > qryRead := dm.qryRead; > qryWrite := dm.qryWrite; > qryRead.SQL.Clear; > qryWrite.SQL.Clear; > except > // Sort de la loop car une connexion n'a pas été obtenu > // Reprendre l'execution après << finally >> en bas > Abort; > end; > > // > try > // read the protocol structure from the client so we can handle > // the client's request > LClientContext.Connection.IOHandler.ReadBytes(LBuffer, szProtocol); > // convert the buffer to protocol structure > LProtocol := BytesToProtocol(LBuffer); > // check client command and act accordingly > case LProtocol.Command of > {$IFDEF CONN_THREAD} > cmdKeepAlive: > begin > // client is still online so the ping was successful, > // reset last received time to 'Now' > TMyData(LClientContext.Data).LastRecvTime := Now; > // Update listview column - last received time > with TUpdateUIClientRecvTime.Create do > begin > ConnectionTime := TMyData(LClientContext.Data).ConnectionTime; > LastRecvTime := TMyData(LClientContext.Data).LastRecvTime; > Notify; > end; // with TUpdateUIClientRecvTime.Create do > end; // cmdKeepAlive: begin > {$ENDIF} > > //************************************************// > // > // Les listes << Ex >> & les rapports simple > // > //************************************************// > cmdListeVillesEx, cmdListeBeneficiairesEx: > begin > // > with qryRead do > begin > // > LGenerics := TGenerics.Create; > // > LGenerics.Clear; > try > SQL.Clear; > SQL.Text := SQLQueryString(LProtocol); > Open; > // > while not EOF do > begin > with LGenerics.Add do > begin > _Integer := Fields.Fields[0].AsInteger; > _String:= Fields.Fields[1].AsWideString; > end; > Next; > end; > // close the dataset > Close; > // copy the collection to a memory stream in preparation for sending to clients > SaveCollectionToStream(LGenerics, LStream); > // > // Source: https://groups.google.com/forum/#!topic/borland.public.delphi.objectpascal/aU7E0y3BYdo > // > LStream.Position := 0; > // > SetLength(LCollectionBuffer, LStream.Size); > // copy the stream into the LCollectionBuffer > LStream.Read(LCollectionBuffer[0], LStream.Size); > // > FreeAndNil(LGenerics); > except > if Active then Close; > if Assigned(LGenerics) then FreeAndNil(LGenerics); > end; // trye > end; // with qryRead do > end; // cmdListeVillesEx et al > end; // case LProtocol.Command of > except > // Is this the reason why there are memory leaks on line 1440 & 1593 > // > //// Libere les objets crée auparavant > //// Libere les << collections >> > //if Assigned(LGenerics) then FreeAndNil(LGenerics); // Error if LGenerics does not exist > //// Libère le stream > //if Assigned(LStream) then FreeAndNil(LStream); > end; > finally > // Convertir le protocol en bytes > LProtocolBuffer := ProtocolToBytes(LProtocol); > // merge the buffers into one buffer > // Source: http://stackoverflow.com/questions/7520068/best-way-to-combine-multiple-tbytes-arrays > SetLength(LBuffer, Length(LProtocolBuffer) + Length(LCollectionBuffer)); > if LProtocolBuffer <> nil then Move(LProtocolBuffer[0], LBuffer[0], Length(LProtocolBuffer)); > if LCollectionBuffer <> nil then Move(LCollectionBuffer[0], LBuffer[Length(LProtocolBuffer)], Length(LCollectionBuffer)); > > //ShowMessage(Format('Protocol buffer length %d; Collection buffer length %d; Merged buffer length %d', [Length(LProtocolBuffer), Length(LCollectionBuffer), Length(LBuffer)])); > > // send the merged buffer to the client > LClientContext.Connection.IOHandler.Write(LBuffer); > // clear all the buffers > ClearBuffer(LProtocolBuffer); > ClearBuffer(LCollectionBuffer); > ClearBuffer(LBuffer); > // Libere LStream > FreeAndNil(LStream); > // > Initialize(LProtocol); > // Libère les composants de données > qryRead := nil; > qryWrite := nil; > qryRead.Free; > qryWrite.Free; > // Remettre la connection dans le pool des connections > ReleaseDM(dm); > // Reinitialise les données du << LastRecvTime >> > TMyData(LClientContext.Data).LastRecvTime := Now; > // Update listview column - last received time > with TUpdateUIClientRecvTime.Create do > begin > ConnectionTime := TMyData(LClientContext.Data).ConnectionTime; > LastRecvTime := TMyData(LClientContext.Data).LastRecvTime; > Notify; > end; // with TUpdateUIClientRecvTime.Create do > end; // tryf > // > end; // if LDataSize >= szProtocol then > end; // if AContext.Connection.IOHandler.InputBufferIsEmpty then > > // Petit pause pour empecher trop de consumation de CPU par le serveur > IndySleep(10); > end; > {code} > > An abridged version of the client side receiving thread is shown below: > {code} > // > // The EXECUTE method for routing server replies > // > // OnError just cancel ALL operations & initialize the form > // > procedure TDataThread.Execute; > > function SliceByteArray(const B: array of Byte; Start, Len: Integer): > TBytes; > begin > if Start < 0 then > Start := 0; > if Len < 0 then > Len := 0 > else if Start >= Length(B) then > Len := 0 > else if Start + Len > Length(B) then > Len := Length(B) - Start; > SetLength(Result, Len); > if Len > 0 then > Move(B[Start], Result[0], Len); > end; > > var > LBuffer, LDataBuffer, LProtocolBuffer: TBytes; > LProtocol: TProtocol; > LDataSize: int64; > begin > // > FreeOnTerminate := not (IsConnectionThread = True); > // > // > while NOT Terminated and ThreadTCPClient.Connected > and (LoopCount > 0) and (ThreadError = EmptyStr) do > begin > // > if IsConnectionThread then > begin > // Ping the server every minute > // ONLY the connection thread is able to do this > if (MinutesBetween(Now, FLastPingTime) >= 1) then > begin > // > Lock; > // initialize the protocol > InitProtocol(LProtocol); > // set the command to cmdKeepAlive > LProtocol.Command := cmdKeepAlive; > // Convertir le protocol en bytes > LBuffer := ProtocolToBytes(LProtocol); > // > try > try > // Envoyez la requête en bytes au serveur > ThreadTCPClient.IOHandler.Write(LBuffer); > // reset the last ping time to Now > LastPingTime := Now; > // The synchonize call below can be used as a visual indicator > // for users to know if they are still online > //Synchronize(@Self.DoConnectionAlive); > except > // Exception handling code here ! > On E: EIdException do begin > raise EAbort.Create(E.Message); > end; > end; > finally > // > Unlock; > // clear buffer > ClearBuffer(LBuffer); > end; > end; // if (MinutesBetween(Now, FLastPingTime) >= 1) then > end > else if not IsConnectionThread then > begin > if not ThreadTCPClient.IOHandler.InputBufferIsEmpty and > (ThreadTCPClient.IOHandler.InputBuffer.Size >= szProtocol) then > begin > // > Lock; > // > try > // > try > // Initialize the protocol > InitProtocol(LProtocol); > // > // > // then read from InputBuffer the size of the protocol structure > ThreadTCPClient.IOHandler.InputBuffer.ExtractToBytes(LBuffer); > // move a block of memory from the beginning of LBuffer to the location of the size of the protocol structure > // LProtocolBuffer > SetLength(LProtocolBuffer, szProtocol); > LProtocolBuffer := SliceByteArray(LBuffer, 0, szProtocol); > // convert array of bytes to protocol > LProtocol := BytesToProtocol(LProtocolBuffer); > // move a block of memory from the location after the size of the protocol structure > // to the end of LBuffer to LCollectionBuffer > SetLength(LCollectionBuffer, Length(LBuffer) - szProtocol); > LDataBuffer := SliceByteArray(LBuffer, szProtocol, Length(LBuffer) - szProtocol); > > // The following line is just for debugging. Remove it when all is well > ShowMessage(Format('Protocol buffer length %d; Collection buffer length %d; Merged buffer length %d', [Length(LProtocolBuffer), Length(LDataBuffer), Length(LBuffer)])); > // > LoopCount := LoopCount - 1; > // > case LProtocol.Command of > cmdListeVillesEx: > begin > // > ControlName := 'Villes'; > // Transferez le flux au thread principale > Synchronize(@DoListes); > end; > cmdListeBeneficiairesEx: > begin > // > ControlName := 'Bénéficiaires'; > // Transferez le flux au thread principale > Synchronize(@DoListes); > end; > end; // case LProtocol.Command of > except > On E: EIdSocketError do begin > //ThreadError := strProblemeDeConnection; > raise EAbort.Create(strProblemeDeConnection) > end; > On E: EIdConnClosedGracefully do > begin > //ThreadError := strConnectionFermeParServeur; > raise EAbort.Create(strProblemeDeConnection) > end; > // > On E: Exception do begin > raise EAbort.Create(E.Message) > end; > end; // try...except > finally > // > Unlock; > // clear buffer > ClearBuffer(LBuffer); > ClearBuffer(LDataBuffer); > ClearBuffer(LProtocolBuffer); > // > ControlName := ''; > end; // try...finally > end; // if not ThreadTCPClient.IOHandler.InputBufferIsEmpty then > end; > // > Sleep(10); > end; // while NOT Terminated and ThreadTCPClient.Connected do > end; > {code} > > Thanks for your kind assistance. > > Jay Dee
![]() |
0 |
![]() |
Jay wrote: > I would like your input concerning some code I use to move collections > to and from streams. Sometimes they work & sometimes they fail. I > haven't been able to figure out why? Before Listes() is called, are you seeking the TMemoryStream's Position back to 0? If not, that can cause LoadCollectionFromStream() to fail. -- Remy Lebeau (TeamB)
![]() |
0 |
![]() |
I'll try setting the position of the stream back to zero, test it extensively & let you know how it turns out. As an aside, instead of converting collections & stringlists (I use them also) to streams, can I just use Indy's RawToBytes & BytesToRaw to convert them to/from bytes directly & completely ignore the intermediate use of streams? Jay Dee
![]() |
0 |
![]() |
Jay wrote: > As an aside, instead of converting collections & stringlists (I use > them also) to streams, can I just use Indy's RawToBytes & > BytesToRaw to convert them to/from bytes directly & completely > ignore the intermediate use of streams? TIdIOHandler has methods for reading/writing TStrings data. But you must serialize a TCollection to/from a flat format (which is what TReader/TWriter do for you). Whether you decide to use a TStream or a byte array or whatever to transmit the serialized data, it doesn't really matter. However, if you really want to read/write the serialized data over the socket using a byte array instead of a TStream, you could wrap the bytes in a TBytesStream when calling TReader/TWriter, at least. RawToBytes()/BytesToRaw() simply copy bytes as-is between a byte array and a memory location. That is not adequate enough by itself for proper serialization. -- Remy Lebeau (TeamB)
![]() |
0 |
![]() |
Thanks for your clarification. I did what you suggested (setting stream position to zero before calling Listes) and I got mixed results. Sometimes it worked & sometimes it did not work. I then undefined the connection thread $DEFINE & voila! I was able to retrieve data from the server over 300 times in succession without fail! I reenabled the connection thread & the problem returned. I now realize that the problem is thread deadlock. I am now convinced that the the normal threads & the connection thread are at loggerheads. The locks/unlocks in the code are just TCriticalSection.Enter & TCriticalSection.Leave. The shared resource is an IdTCPClient, that is why I use critical sections to protect it. Aside from the Execute() code above, I also use locks when sending data to the server as follows {code} // Convertir le protocol en bytes LBuffer := ProtocolToBytes(LProtocol); // Lock; // try // try //// clear any residual data that may be in the input buffer //if not ThreadTCPClient.IOHandler.InputBufferIsEmpty and // (ThreadTCPClient.IOHandler.InputBuffer.Size >= szProtocol) then // ThreadTCPClient.IOHandler.InputBuffer.Clear; // Envoyez la requête en bytes au serveur ThreadTCPClient.IOHandler.Write(LBuffer); except // Exception handling code here ! On E: EIdSocketError do begin raise EAbort.Create(strProblemeDeConnection) end; On E: EIdConnClosedGracefully do begin raise EAbort.Create(strProblemeDeConnection) end; // On E: Exception do begin raise EAbort.Create(E.Message) end; end; finally // Unlock; // ClearBuffer(LBuffer); end; // try....finally {code} I could abandon the idea of a HEARTBEAT connection thread but it is very useful & I'd like to keep it. What do you think I should do to avoid thread deadlock? Thanks, Jay Dee
![]() |
0 |
![]() |
Jay wrote: > I could abandon the idea of a HEARTBEAT connection thread but it > is very useful & I'd like to keep it. What do you think I should do to > avoid thread deadlock? Why are you not sending heartbeats on all of the connections? Your protocol handlers are already set up to handle them, so why not just enable them always? It doesn't really make sense to have a dedicated connection thread just for heartbeats, when the other threads can (and should) handle their own local heartbeats. -- Remy Lebeau (TeamB)
![]() |
0 |
![]() |
Hi there Remy, I did not put a heartbeat in all of the threads because the threads terminate on completion (FreeOnTerminate := True) so if a client decides to take a coffee break or is doing something else on the computer, he is still connected but the only thread that is active will be the connection thread (FreeOnTerminate := False). That is why I kept the connection thread. I even have a class variable FLastPingTime declared in the thread class thinking that ALL threads could have this heartbeat so technically FLastPingTime could be set to Now everytime a thread sends data to the server. The question is how do I take account of pauses (no data request is being sent/received) that could exceed 2 minutes (e.g coffee breaks etc) while still connected to the server? I'm certain that solving this will solve this problem because like I said in my last reply, once I undefine {code} //{$DEFINE CONN_THREAD} {code} everything works perfectly. Thanks, Jay Dee
![]() |
0 |
![]() |